/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.s3a.commit;

import com.amazonaws.services.s3.model.MultipartUpload;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.CommitOperations;
import org.apache.hadoop.fs.s3a.commit.CommitUtils;
import org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.Tasks;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractS3ACommitter
extends PathOutputCommitter {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractS3ACommitter.class);
    public static final String THREAD_PREFIX = "s3a-committer-pool-";
    private ExecutorService threadPool;
    private final CommitOperations commitOperations;
    private Path outputPath;
    private final String role;
    private Path workPath;
    private Configuration conf;
    private FileSystem destFS;
    private final JobContext jobContext;
    private final boolean createJobMarker;

    protected AbstractS3ACommitter(Path outputPath, TaskAttemptContext context) throws IOException {
        super(outputPath, context);
        Preconditions.checkArgument((outputPath != null ? 1 : 0) != 0, (Object)"null output path");
        Preconditions.checkArgument((context != null ? 1 : 0) != 0, (Object)"null job context");
        this.jobContext = context;
        this.role = "Task committer " + context.getTaskAttemptID();
        this.setConf(context.getConfiguration());
        this.initOutput(outputPath);
        LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", new Object[]{this.role, CommitUtilsWithMR.jobName((JobContext)context), CommitUtilsWithMR.jobIdString((JobContext)context), outputPath});
        S3AFileSystem fs = this.getDestS3AFS();
        this.createJobMarker = context.getConfiguration().getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true);
        this.commitOperations = new CommitOperations(fs);
    }

    @VisibleForTesting
    protected void initOutput(Path out) throws IOException {
        FileSystem fs = this.getDestinationFS(out, this.getConf());
        this.setDestFS(fs);
        this.setOutputPath(fs.makeQualified(out));
    }

    public final JobContext getJobContext() {
        return this.jobContext;
    }

    public final Path getOutputPath() {
        return this.outputPath;
    }

    protected final void setOutputPath(Path outputPath) {
        Preconditions.checkNotNull((Object)outputPath, (Object)"Null output path");
        this.outputPath = outputPath;
    }

    public Path getWorkPath() {
        return this.workPath;
    }

    protected void setWorkPath(Path workPath) {
        LOG.debug("Setting work path to {}", (Object)workPath);
        this.workPath = workPath;
    }

    public Configuration getConf() {
        return this.conf;
    }

    protected void setConf(Configuration conf) {
        this.conf = conf;
    }

    public FileSystem getDestFS() throws IOException {
        if (this.destFS == null) {
            FileSystem fs = this.getDestinationFS(this.outputPath, this.getConf());
            this.setDestFS(fs);
        }
        return this.destFS;
    }

    public S3AFileSystem getDestS3AFS() throws IOException {
        return (S3AFileSystem)this.getDestFS();
    }

    protected void setDestFS(FileSystem destFS) {
        this.destFS = destFS;
    }

    public Path getJobAttemptPath(JobContext context) {
        return this.getJobAttemptPath(CommitUtilsWithMR.getAppAttemptId(context));
    }

    protected abstract Path getJobAttemptPath(int var1);

    public Path getTaskAttemptPath(TaskAttemptContext context) {
        return this.getBaseTaskAttemptPath(context);
    }

    protected abstract Path getBaseTaskAttemptPath(TaskAttemptContext var1);

    public abstract Path getTempTaskAttemptPath(TaskAttemptContext var1);

    public abstract String getName();

    public String toString() {
        StringBuilder sb = new StringBuilder("AbstractS3ACommitter{");
        sb.append("role=").append(this.role);
        sb.append(", name=").append(this.getName());
        sb.append(", outputPath=").append(this.getOutputPath());
        sb.append(", workPath=").append(this.workPath);
        sb.append('}');
        return sb.toString();
    }

    protected FileSystem getDestinationFS(Path out, Configuration config) throws IOException {
        return CommitUtils.getS3AFileSystem(out, config, this.requiresDelayedCommitOutputInFileSystem());
    }

    protected boolean requiresDelayedCommitOutputInFileSystem() {
        return false;
    }

    public void recoverTask(TaskAttemptContext taskContext) throws IOException {
        LOG.warn("Cannot recover task {}", (Object)taskContext.getTaskAttemptID());
        throw new PathCommitException(this.outputPath, String.format("Unable to recover task %s", taskContext.getTaskAttemptID()));
    }

    protected void maybeCreateSuccessMarkerFromCommits(JobContext context, ActiveCommit pending) throws IOException {
        ArrayList<String> filenames = new ArrayList<String>(pending.size());
        filenames.addAll(pending.committedObjects);
        this.maybeCreateSuccessMarker(context, filenames);
    }

    protected void maybeCreateSuccessMarker(JobContext context, List<String> filenames) throws IOException {
        if (this.createJobMarker) {
            SuccessData successData = new SuccessData();
            successData.setCommitter(this.getName());
            successData.setDescription(this.getRole());
            successData.setHostname(NetUtils.getLocalHostname());
            Date now = new Date();
            successData.setTimestamp(now.getTime());
            successData.setDate(now.toString());
            successData.setFilenames(filenames);
            this.commitOperations.createSuccessMarker(this.getOutputPath(), successData, true);
        }
    }

    public void setupJob(JobContext context) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "preparing destination", new Object[0]);){
            if (this.createJobMarker) {
                this.commitOperations.deleteSuccessMarker(this.getOutputPath());
            }
            this.getDestFS().mkdirs(this.getOutputPath());
        }
    }

    public void setupTask(TaskAttemptContext context) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s", new Object[]{context.getTaskAttemptID()});){
            Path taskAttemptPath = this.getTaskAttemptPath(context);
            FileSystem fs = this.getTaskAttemptFilesystem(context);
            fs.mkdirs(taskAttemptPath);
        }
    }

    protected FileSystem getTaskAttemptFilesystem(TaskAttemptContext context) throws IOException {
        return this.getTaskAttemptPath(context).getFileSystem(this.getConf());
    }

    protected void commitPendingUploads(JobContext context, ActiveCommit pending) throws IOException {
        if (pending.isEmpty()) {
            LOG.warn("{}: No pending uploads to commit", (Object)this.getRole());
        }
        try (DurationInfo ignored = new DurationInfo(LOG, "committing the output of %s task(s)", new Object[]{pending.size()});
             CommitOperations.CommitContext commitContext = this.initiateCommitOperation();){
            Tasks.foreach(pending.getSourceFiles()).stopOnFailure().suppressExceptions(false).executeWith(this.buildThreadPool(context)).abortWith(path -> this.loadAndAbort(commitContext, pending, (Path)path, true, false)).revertWith(path -> this.loadAndRevert(commitContext, pending, (Path)path)).run(path -> this.loadAndCommit(commitContext, pending, (Path)path));
        }
    }

    protected void precommitCheckPendingFiles(JobContext context, ActiveCommit pending) throws IOException {
        FileSystem sourceFS = pending.getSourceFS();
        try (DurationInfo ignored = new DurationInfo(LOG, "Preflight Load of pending files", new Object[0]);){
            Tasks.foreach(pending.getSourceFiles()).stopOnFailure().suppressExceptions(false).executeWith(this.buildThreadPool(context)).run(path -> PendingSet.load(sourceFS, path));
        }
    }

    private void loadAndCommit(CommitOperations.CommitContext commitContext, ActiveCommit activeCommit, Path path) throws IOException {
        try (DurationInfo ignored = new DurationInfo(LOG, false, "Committing %s", new Object[]{path});){
            PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
            Tasks.foreach(pendingSet.getCommits()).stopOnFailure().suppressExceptions(false).executeWith(this.singleCommitThreadPool()).onFailure((commit, exception) -> commitContext.abortSingleCommit((SinglePendingCommit)commit)).abortWith(commitContext::abortSingleCommit).revertWith(commitContext::revertCommit).run(commit -> {
                commitContext.commitOrFail((SinglePendingCommit)commit);
                activeCommit.uploadCommitted(commit.getDestinationKey(), commit.getLength());
            });
        }
    }

    private void loadAndRevert(CommitOperations.CommitContext commitContext, ActiveCommit activeCommit, Path path) throws IOException {
        try (DurationInfo ignored = new DurationInfo(LOG, false, "Committing %s", new Object[]{path});){
            PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
            Tasks.foreach(pendingSet.getCommits()).suppressExceptions(true).run(commitContext::revertCommit);
        }
    }

    private void loadAndAbort(CommitOperations.CommitContext commitContext, ActiveCommit activeCommit, Path path, boolean suppressExceptions, boolean deleteRemoteFiles) throws IOException {
        try (DurationInfo ignored = new DurationInfo(LOG, false, "Aborting %s", new Object[]{path});){
            PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
            FileSystem fs = this.getDestFS();
            Tasks.foreach(pendingSet.getCommits()).executeWith(this.singleCommitThreadPool()).suppressExceptions(suppressExceptions).run(commit -> {
                block2: {
                    try {
                        commitContext.abortSingleCommit((SinglePendingCommit)commit);
                    }
                    catch (FileNotFoundException e) {
                        if (!deleteRemoteFiles) break block2;
                        fs.delete(commit.destinationPath(), false);
                    }
                }
            });
        }
    }

    protected CommitOperations.CommitContext initiateCommitOperation() throws IOException {
        return this.getCommitOperations().initiateCommitOperation(this.getOutputPath());
    }

    protected void commitJobInternal(JobContext context, ActiveCommit pending) throws IOException {
        this.commitPendingUploads(context, pending);
    }

    public void abortJob(JobContext context, JobStatus.State state) throws IOException {
        LOG.info("{}: aborting job {} in state {}", new Object[]{this.getRole(), CommitUtilsWithMR.jobIdString(context), state});
        this.abortJobInternal(context, false);
    }

    protected void abortJobInternal(JobContext context, boolean suppressExceptions) throws IOException {
        this.cleanup(context, suppressExceptions);
    }

    protected void abortPendingUploadsInCleanup(boolean suppressExceptions) throws IOException {
        Path dest = this.getOutputPath();
        try (DurationInfo ignored = new DurationInfo(LOG, "Aborting all pending commits under %s", new Object[]{dest});
             CommitOperations.CommitContext commitContext = this.initiateCommitOperation();){
            List<MultipartUpload> pending;
            CommitOperations ops = this.getCommitOperations();
            try {
                pending = ops.listPendingUploadsUnderPath(dest);
            }
            catch (IOException e) {
                this.maybeIgnore(suppressExceptions, "aborting pending uploads", e);
                if (commitContext != null) {
                    if (var6_8 != null) {
                        try {
                            commitContext.close();
                        }
                        catch (Throwable throwable) {
                            var6_8.addSuppressed(throwable);
                        }
                    } else {
                        commitContext.close();
                    }
                }
                if (ignored != null) {
                    if (var4_4 != null) {
                        try {
                            ignored.close();
                        }
                        catch (Throwable throwable) {
                            var4_4.addSuppressed(throwable);
                        }
                    } else {
                        ignored.close();
                    }
                }
                return;
            }
            Tasks.foreach(pending).executeWith(this.buildThreadPool(this.getJobContext())).suppressExceptions(suppressExceptions).run(u -> commitContext.abortMultipartCommit(u.getKey(), u.getUploadId()));
        }
    }

    @VisibleForTesting
    public void preCommitJob(JobContext context, ActiveCommit pending) throws IOException {
    }

    public void commitJob(JobContext context) throws IOException {
        String id = CommitUtilsWithMR.jobIdString(context);
        try (DurationInfo d = new DurationInfo(LOG, "%s: commitJob(%s)", new Object[]{this.getRole(), id});){
            ActiveCommit pending = this.listPendingUploadsToCommit(context);
            this.preCommitJob(context, pending);
            this.commitJobInternal(context, pending);
            this.jobCompleted(true);
            this.maybeCreateSuccessMarkerFromCommits(context, pending);
            this.cleanup(context, false);
        }
        catch (IOException e) {
            LOG.warn("Commit failure for job {}", (Object)id, (Object)e);
            this.jobCompleted(false);
            this.abortJobInternal(context, true);
            throw e;
        }
    }

    protected void jobCompleted(boolean success) {
        this.getCommitOperations().jobCompleted(success);
    }

    public abstract void cleanupStagingDirs();

    protected abstract ActiveCommit listPendingUploadsToCommit(JobContext var1) throws IOException;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void cleanup(JobContext context, boolean suppressExceptions) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "Cleanup job %s", new Object[]{CommitUtilsWithMR.jobIdString(context)});){
            this.abortPendingUploadsInCleanup(suppressExceptions);
        }
        finally {
            this.destroyThreadPool();
            this.cleanupStagingDirs();
        }
    }

    public void cleanupJob(JobContext context) throws IOException {
        String r = this.getRole();
        String id = CommitUtilsWithMR.jobIdString(context);
        LOG.warn("{}: using deprecated cleanupJob call for {}", (Object)r, (Object)id);
        try (DurationInfo d = new DurationInfo(LOG, "%s: cleanup Job %s", new Object[]{r, id});){
            this.cleanup(context, true);
        }
    }

    protected void maybeIgnore(boolean suppress, String action, Invoker.VoidOperation operation) throws IOException {
        if (suppress) {
            Invoker.ignoreIOExceptions(LOG, action, "", operation);
        } else {
            operation.execute();
        }
    }

    protected void maybeIgnore(boolean suppress, String action, IOException ex) throws IOException {
        if (!suppress) {
            throw ex;
        }
        LOG.debug(action, (Throwable)ex);
    }

    protected CommitOperations getCommitOperations() {
        return this.commitOperations;
    }

    protected String getRole() {
        return this.role;
    }

    protected final synchronized ExecutorService buildThreadPool(JobContext context) {
        if (this.threadPool == null) {
            int numThreads = context.getConfiguration().getInt("fs.s3a.committer.threads", 8);
            LOG.debug("{}: creating thread pool of size {}", (Object)this.getRole(), (Object)numThreads);
            if (numThreads > 0) {
                this.threadPool = HadoopExecutors.newFixedThreadPool((int)numThreads, (ThreadFactory)new ThreadFactoryBuilder().setDaemon(true).setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d").build());
            } else {
                return null;
            }
        }
        return this.threadPool;
    }

    protected synchronized void destroyThreadPool() {
        if (this.threadPool != null) {
            LOG.debug("Destroying thread pool");
            HadoopExecutors.shutdown((ExecutorService)this.threadPool, (Logger)LOG, (long)30L, (TimeUnit)TimeUnit.SECONDS);
            this.threadPool = null;
        }
    }

    protected final synchronized ExecutorService singleCommitThreadPool() {
        return null;
    }

    public synchronized boolean hasThreadPool() {
        return this.threadPool != null;
    }

    protected void deleteTaskAttemptPathQuietly(TaskAttemptContext context) {
        Path attemptPath = this.getBaseTaskAttemptPath(context);
        Invoker.ignoreIOExceptions(LOG, "Delete task attempt path", attemptPath.toString(), () -> S3AUtils.deleteQuietly(this.getTaskAttemptFilesystem(context), attemptPath, true));
    }

    protected void abortPendingUploads(JobContext context, List<SinglePendingCommit> pending, boolean suppressExceptions) throws IOException {
        if (pending == null || pending.isEmpty()) {
            LOG.info("{}: no pending commits to abort", (Object)this.getRole());
        } else {
            try (DurationInfo d = new DurationInfo(LOG, "Aborting %s uploads", new Object[]{pending.size()});
                 CommitOperations.CommitContext commitContext = this.initiateCommitOperation();){
                Tasks.foreach(pending).executeWith(this.buildThreadPool(context)).suppressExceptions(suppressExceptions).run(commitContext::abortSingleCommit);
            }
        }
    }

    protected void abortPendingUploads(JobContext context, ActiveCommit pending, boolean suppressExceptions, boolean deleteRemoteFiles) throws IOException {
        if (pending.isEmpty()) {
            LOG.info("{}: no pending commits to abort", (Object)this.getRole());
        } else {
            try (DurationInfo d = new DurationInfo(LOG, "Aborting %s uploads", new Object[]{pending.size()});
                 CommitOperations.CommitContext commitContext = this.initiateCommitOperation();){
                Tasks.foreach(pending.getSourceFiles()).executeWith(this.buildThreadPool(context)).suppressExceptions(suppressExceptions).run(path -> this.loadAndAbort(commitContext, pending, (Path)path, suppressExceptions, deleteRemoteFiles));
            }
        }
    }

    public static class ActiveCommit {
        private static final ActiveCommit EMPTY = new ActiveCommit(null, new ArrayList<Path>());
        private final List<Path> sourceFiles;
        private final FileSystem sourceFS;
        private final List<String> committedObjects = new ArrayList<String>();
        private int committedObjectCount;
        private long committedBytes;

        public ActiveCommit(FileSystem sourceFS, List<Path> sourceFiles) {
            this.sourceFiles = sourceFiles;
            this.sourceFS = sourceFS;
        }

        public static ActiveCommit fromStatusList(FileSystem pendingFS, List<? extends FileStatus> statuses) {
            return new ActiveCommit(pendingFS, statuses.stream().map(FileStatus::getPath).collect(Collectors.toList()));
        }

        public static ActiveCommit empty() {
            return EMPTY;
        }

        public List<Path> getSourceFiles() {
            return this.sourceFiles;
        }

        public FileSystem getSourceFS() {
            return this.sourceFS;
        }

        public synchronized void uploadCommitted(String key, long size) {
            if (this.committedObjects.size() < 100) {
                this.committedObjects.add(key.startsWith("/") ? key : "/" + key);
            }
            ++this.committedObjectCount;
            this.committedBytes += size;
        }

        public synchronized List<String> getCommittedObjects() {
            return this.committedObjects;
        }

        public synchronized int getCommittedFileCount() {
            return this.committedObjectCount;
        }

        public synchronized long getCommittedBytes() {
            return this.committedBytes;
        }

        public int size() {
            return this.sourceFiles.size();
        }

        public boolean isEmpty() {
            return this.sourceFiles.isEmpty();
        }

        public void add(Path path) {
            this.sourceFiles.add(path);
        }
    }
}

