package com.amazon.ws.emr.hadoop.fs.maintenance;

import com.amazon.ws.emr.hadoop.fs.property.ConfigurationConstants;
import com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3Lite;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.MultipartUpload;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.MultipartUploadListing;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.annotations.VisibleForTesting;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.util.concurrent.MoreExecutors;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.amazon.ws.emr.hadoop.fs.util.EmrFsUtils;
import java.net.URI;
import java.time.Duration;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/amazon/ws/emr/hadoop/fs/maintenance/MultipartUploadCleaner.class */
public class MultipartUploadCleaner {
    private static final long SCHEDULER_SHUTDOWN_TIMEOUT_MILLIS = 1000;
    private static final int MULTIPART_LISTING_PAGE_SIZE = 1000;
    private final Duration incompleteMultipartAgeThreshold;
    final int maxJitterDelayMillis;
    private final AmazonS3Lite s3;
    private final Optional<ScheduledExecutorService> optionalScheduler;
    private final Random random;
    private static final Logger LOG = LoggerFactory.getLogger(MultipartUploadCleaner.class);
    private static MultipartUploadCleaner INSTANCE = null;
    private final Duration cleanerThreadInitialDelay = Duration.ofSeconds(0);
    final Duration cleanerThreadFixedDelay = Duration.ofSeconds(900);
    private volatile boolean isClosed = false;
    private final Set<String> s3Buckets = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:com/amazon/ws/emr/hadoop/fs/maintenance/MultipartUploadCleaner$MultipartCleanUpRunnable.class */
    public class MultipartCleanUpRunnable implements Runnable {
        private final String bucketName;
        private final String prefix;
        private final Duration nextRoundDelay;
        private final int multipartListingPageSize;
        private Date cleanBefore;
        private MultipartUploadListing uploadListing;

        MultipartCleanUpRunnable(MultipartUploadCleaner multipartUploadCleaner, String str) {
            this(str, null, multipartUploadCleaner.cleanerThreadFixedDelay, 1000);
        }

        private void scheduleNextCleanupRound() {
            MultipartUploadCleaner.LOG.debug("Scheduling next cleanup round for bucket {} in approximately {} seconds", this.bucketName, Long.valueOf(this.nextRoundDelay.getSeconds()));
            this.cleanBefore = null;
            this.uploadListing = null;
            MultipartUploadCleaner.this.scheduleCleanUpWithJitter(this.nextRoundDelay, this);
        }

        private void scheduleNextPageCleanup() {
            MultipartUploadCleaner.LOG.debug("Scheduling cleanup for next page of multipart uploads for bucket {} in approximately 0 seconds", this.bucketName);
            MultipartUploadCleaner.this.scheduleCleanUpWithJitter(Duration.ZERO, this);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (this.cleanBefore == null) {
                    this.cleanBefore = new Date(new Date().getTime() - MultipartUploadCleaner.this.incompleteMultipartAgeThreshold.toMillis());
                }
                if (this.uploadListing == null) {
                    MultipartUploadCleaner.LOG.debug("Cleaning multipart uploads before {} for bucket {}", this.cleanBefore, this.bucketName);
                    this.uploadListing = MultipartUploadCleaner.this.s3.listMultipartUploads(newListMultipartUploadsRequest());
                } else {
                    MultipartUploadCleaner.LOG.debug("Cleaning next page of multipart uploads before {} for bucket {}", this.cleanBefore, this.bucketName);
                    this.uploadListing = MultipartUploadCleaner.this.s3.listMultipartUploads(newListMultipartUploadsRequest().withKeyMarker(this.uploadListing.getNextKeyMarker()).withUploadIdMarker(this.uploadListing.getNextUploadIdMarker()));
                }
                MultipartUploadCleaner.this.abortMultipartUploads(this.uploadListing.getMultipartUploads(), this.cleanBefore, this.bucketName);
                if (this.uploadListing.isTruncated()) {
                    scheduleNextPageCleanup();
                } else {
                    scheduleNextCleanupRound();
                }
            } catch (Exception e) {
                MultipartUploadCleaner.this.logWarnIfNotClosed("Exception caught while clearing multipart uploads", e);
                scheduleNextCleanupRound();
            }
        }

        private ListMultipartUploadsRequest newListMultipartUploadsRequest() {
            return new ListMultipartUploadsRequest(this.bucketName).withPrefix(this.prefix).withMaxUploads(this.multipartListingPageSize);
        }

        public MultipartCleanUpRunnable(String str, String str2, Duration duration, int i) {
            this.bucketName = str;
            this.prefix = str2;
            this.nextRoundDelay = duration;
            this.multipartListingPageSize = i;
        }
    }

    public static synchronized MultipartUploadCleaner getInstance(Configuration configuration, AmazonS3Lite amazonS3Lite) {
        if (INSTANCE == null) {
            INSTANCE = new MultipartUploadCleaner(configuration, amazonS3Lite);
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                INSTANCE.close();
            }));
        }
        return INSTANCE;
    }

    @VisibleForTesting
    MultipartUploadCleaner(Configuration configuration, AmazonS3Lite amazonS3Lite) {
        this.incompleteMultipartAgeThreshold = Duration.ofSeconds(configuration.getLong(ConfigurationConstants.CLEAN_INCOMPLETE_MULTIPART_AGE_THRESHOLD_SECONDS, 604800L));
        this.maxJitterDelayMillis = configuration.getInt(ConfigurationConstants.CLEAN_INCOMPLETE_MULTIPART_MAX_JITTER_DELAY_MILLIS, 10000);
        this.s3 = amazonS3Lite;
        this.optionalScheduler = configuration.getBoolean(ConfigurationConstants.CLEAN_INCOMPLETE_MULTIPART_ENABLED, false) ? Optional.of(Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("multipart-cleanup-service").setDaemon(true).build())) : Optional.empty();
        this.random = new Random();
    }

    private boolean isMultipartCleanupEnabled() {
        return this.optionalScheduler.isPresent();
    }

    private Duration getJitterDelay() {
        return Duration.ofMillis(this.random.nextInt(this.maxJitterDelayMillis));
    }

    @VisibleForTesting
    synchronized void doScheduleMultipartCleanup(String str) {
        if (!isMultipartCleanupEnabled() || this.s3Buckets.contains(str)) {
            return;
        }
        this.s3Buckets.add(str);
        scheduleCleanUpWithJitter(this.cleanerThreadInitialDelay, new MultipartCleanUpRunnable(this, str));
        LOG.info("Multipart upload cleanup is now enabled for bucket: {}", str);
    }

    public Optional<Future<?>> scheduleMultipartCleanup(String str) {
        return this.optionalScheduler.map(scheduledExecutorService -> {
            return scheduleMultipartCleanupAsync(scheduledExecutorService, () -> {
                doScheduleMultipartCleanup(str);
            }, exc -> {
                logWarnIfNotClosed("Exception scheduling multipart upload cleanup for bucket " + str, exc);
            });
        });
    }

    public Optional<Future<?>> scheduleMultipartCleanup(Path path) {
        return this.optionalScheduler.map(scheduledExecutorService -> {
            return scheduleMultipartCleanupAsync(scheduledExecutorService, () -> {
                doScheduleMultipartCleanup(EmrFsUtils.pathToBucket(path));
            }, exc -> {
                logWarnIfNotClosed("Exception scheduling multipart upload cleanup for path: " + path, exc);
            });
        });
    }

    public Optional<Future<?>> scheduleMultipartCleanup(URI uri) {
        return this.optionalScheduler.map(scheduledExecutorService -> {
            return scheduleMultipartCleanupAsync(scheduledExecutorService, () -> {
                doScheduleMultipartCleanup(EmrFsUtils.pathToBucket(new Path(uri)));
            }, exc -> {
                logWarnIfNotClosed("Exception scheduling multipart upload cleanup for uri: " + uri, exc);
            });
        });
    }

    private static Future<?> scheduleMultipartCleanupAsync(ScheduledExecutorService scheduledExecutorService, Runnable runnable, Consumer<Exception> consumer) {
        return scheduledExecutorService.submit(() -> {
            try {
                runnable.run();
            } catch (Exception e) {
                consumer.accept(e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void logWarnIfNotClosed(String str, Throwable th) {
        if (this.isClosed) {
            LOG.debug("Error while shutting down the multipart upload cleanup system: {}", str, th);
        } else {
            LOG.warn(str, th);
        }
    }

    @VisibleForTesting
    Set<String> getS3Buckets() {
        return this.s3Buckets;
    }

    public synchronized void close() {
        if (this.isClosed) {
            return;
        }
        this.isClosed = true;
        this.optionalScheduler.ifPresent(scheduledExecutorService -> {
            try {
                LOG.info("Shutting down multipart cleanup service");
                MoreExecutors.shutdownAndAwaitTermination(scheduledExecutorService, 1000L, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                LOG.error("Couldn't shutdown multipart cleanup service", e);
            }
        });
    }

    @VisibleForTesting
    int abortMultipartUploads(List<MultipartUpload> list, Date date, String str) {
        int i = 0;
        for (MultipartUpload multipartUpload : list) {
            if (multipartUpload.getInitiated().before(date)) {
                LOG.info("Aborting MultipartUpload {} for {} as it is older than configured age threshold of {} seconds", new Object[]{multipartUpload.getUploadId(), multipartUpload.getKey(), Long.valueOf(this.incompleteMultipartAgeThreshold.getSeconds())});
                try {
                    this.s3.abortMultipartUpload(new AbortMultipartUploadRequest(str, multipartUpload.getKey(), multipartUpload.getUploadId()));
                    i++;
                } catch (Exception e) {
                    logWarnIfNotClosed("Exception caught while aborting multipart uploads", e);
                }
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleCleanUpWithJitter(Duration duration, Runnable runnable) {
        try {
            this.optionalScheduler.ifPresent(scheduledExecutorService -> {
                if (this.isClosed) {
                    return;
                }
                scheduledExecutorService.schedule(runnable, duration.plus(getJitterDelay()).toMillis(), TimeUnit.MILLISECONDS);
            });
        } catch (Exception e) {
            logWarnIfNotClosed("Exception caught while trying to schedule cleanup task", e);
        }
    }
}
