package com.amazon.ws.emr.hadoop.fs.s3;

import com.amazon.ws.emr.hadoop.fs.EmrFsStore;
import com.amazon.ws.emr.hadoop.fs.consistency.ItemKeys;
import com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException;
import com.amazon.ws.emr.hadoop.fs.cse.CSEUtils;
import com.amazon.ws.emr.hadoop.fs.dynamodb.Entity;
import com.amazon.ws.emr.hadoop.fs.dynamodb.EntityStore;
import com.amazon.ws.emr.hadoop.fs.property.RetryPolicyType;
import com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3Lite;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonServiceException;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.annotations.VisibleForTesting;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.base.Preconditions;
import com.amazon.ws.emr.hadoop.fs.shaded.org.joda.time.DateTimeConstants;
import com.amazon.ws.emr.hadoop.fs.util.ConfigurationUtils;
import com.amazon.ws.emr.hadoop.fs.util.EmrFsUtils;
import com.amazon.ws.emr.hadoop.fs.util.RetryUtils;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.NonNull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/amazon/ws/emr/hadoop/fs/s3/S3FSInputStream.class */
public class S3FSInputStream extends FSInputStream implements CanUnbuffer {
    private static final Logger LOG = LoggerFactory.getLogger(S3FSInputStream.class);
    private InputStreamWithInfo in;
    private AmazonS3Lite s3;
    private long lastReadPos = 0;
    private long nextReadPos = 0;
    private EntityStore entityStore;
    private String bucketName;
    private boolean throwOnInconsistency;
    private Configuration conf;
    private final boolean lazySeek;
    private long readRetryIntervalMS;
    private int fastFirstRetryDelayMS;
    private final InputStreamWithInfoFactory inputStreamWithInfoFactory;
    private final RetryPolicyType defaultRetryPolicy;
    private final AtomicBoolean shouldTryInitialTimeout;

    public S3FSInputStream(@NonNull String str, @NonNull String str2, @NonNull AmazonS3Lite amazonS3Lite, @NonNull EntityStore entityStore, boolean z, @NonNull Configuration configuration, @NonNull InputStreamWithInfoFactory inputStreamWithInfoFactory) throws ConsistencyException, IOException {
        if (str == null) {
            throw new NullPointerException("bucketName");
        }
        if (str2 == null) {
            throw new NullPointerException("key");
        }
        if (amazonS3Lite == null) {
            throw new NullPointerException("s3");
        }
        if (entityStore == null) {
            throw new NullPointerException("entityStore");
        }
        if (configuration == null) {
            throw new NullPointerException("conf");
        }
        if (inputStreamWithInfoFactory == null) {
            throw new NullPointerException("inputStreamWithInfoFactory");
        }
        this.bucketName = str;
        this.s3 = amazonS3Lite;
        this.entityStore = entityStore;
        this.throwOnInconsistency = z;
        this.conf = configuration;
        this.lazySeek = ConfigurationUtils.isLazySeekEnabled(configuration);
        this.readRetryIntervalMS = ConfigurationUtils.getConsistencyRetryPeriodSeconds(configuration) * DateTimeConstants.MILLIS_PER_SECOND;
        this.fastFirstRetryDelayMS = ConfigurationUtils.getFastFirstRetryPeriodMs(configuration);
        this.shouldTryInitialTimeout = new AtomicBoolean(true);
        this.inputStreamWithInfoFactory = inputStreamWithInfoFactory;
        open(str2, 0L);
        this.defaultRetryPolicy = ConfigurationUtils.getRetryPolicyType(configuration);
    }

    private void open(String str, long j) throws ConsistencyException, IOException {
        Preconditions.checkArgument(j >= 0, "Cannot seek to a negative position");
        try {
            long plaintextLength = CSEUtils.getPlaintextLength(this.s3, this.bucketName, str, (ObjectMetadata) null, this.conf);
            LOG.debug("Stream for key '{}' seeking to position '{}'", str, Long.valueOf(j));
            this.in = this.inputStreamWithInfoFactory.create(this.bucketName, str, j, plaintextLength, this.shouldTryInitialTimeout.get());
            this.lastReadPos = j;
            this.nextReadPos = j;
        } catch (AmazonServiceException e) {
            if (e.getStatusCode() != 404) {
                throw new IOException(e);
            }
            Entity retrieve = this.entityStore.retrieve(ItemKeys.toItemKey(this.bucketName, str));
            if (retrieve != null && EmrFsStore.MetadataFile.parseFrom(retrieve.getPayload()).getState() != EmrFsStore.MetadataFile.State.DELETED) {
                throw new ConsistencyException(String.format("Unable to get object '%s/%s' from s3", this.bucketName, str), e, Collections.singletonList(EmrFsUtils.getPathForS3Object(this.bucketName, str)));
            }
            throw new FileNotFoundException(String.format("File '%s/%s' has been deleted in both metadata and s3", this.bucketName, str));
        }
    }

    public synchronized int read() throws IOException {
        throw new UnsupportedOperationException("Single byte read() not implemented");
    }

    private void advance(int i) {
        this.lastReadPos += i;
        this.nextReadPos += i;
    }

    public synchronized int read(byte[] bArr, int i, int i2) throws IOException {
        Preconditions.checkNotNull(bArr, "byte array 'b' is required");
        if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
            throw new IndexOutOfBoundsException();
        }
        if (i2 == 0) {
            return 0;
        }
        if (atEndOfStreamIfKnown()) {
            return -1;
        }
        int i3 = -1;
        Exception exc = null;
        for (int i4 = 0; i4 < 5; i4++) {
            if (i4 <= 0) {
                try {
                    try {
                        if (this.lazySeek) {
                            seekStream();
                        }
                    } catch (FileNotFoundException e) {
                        LOG.info("Encountered an exception while reading '{}', file not present", this.in.getKey(), e);
                        throw new FileNotFoundException("File not present on S3");
                    }
                } catch (AmazonClientException | IOException e2) {
                    this.shouldTryInitialTimeout.set(false);
                    exc = e2;
                    if (i4 >= 4) {
                        LOG.info("Encountered exception while reading '{}', max retries exceeded.", this.in.getKey(), e2);
                    } else {
                        LOG.info("Encountered exception while reading '{}', will retry by attempting to reopen stream.", this.in.getKey(), e2);
                        long retryInterval = getRetryInterval(i4, e2);
                        LOG.debug("Back off {} ms for retrying open stream while reading due to s3 GET-After-PUT consistency issue OR another IO related exception such as SocketReset. For best practice please see https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel", Long.valueOf(retryInterval));
                        EmrFsUtils.sleep(retryInterval);
                    }
                }
            } else {
                reopenStream();
            }
            i3 = this.in.read(bArr, i, i2);
            if (i3 > 0) {
                advance(i3);
                break;
            }
            if (this.in.shouldBreakReadRetry(this.nextReadPos)) {
                break;
            }
            LOG.warn(generateUnexpectedEndOfStreamMsg());
            if (i4 >= 4) {
                LOG.error("Unable to recover reading from stream");
                throw new IOException(generateUnexpectedEndOfStreamMsg(), exc);
            }
        }
        return i3;
    }

    @VisibleForTesting
    protected long getRetryInterval(int i, Exception exc) {
        return RetryUtils.calcRetryInterval(this.defaultRetryPolicy, this.readRetryIntervalMS, i, this.fastFirstRetryDelayMS, exc);
    }

    private String generateUnexpectedEndOfStreamMsg() {
        StringBuilder sb = new StringBuilder("Unexpected end of stream pos=" + this.lastReadPos);
        if (this.in.isSelect()) {
            sb.append(", byteScanned=" + this.in.getSelectByteScanned());
        }
        sb.append(", contentLength=" + this.in.getContentLength());
        return sb.toString();
    }

    public void close() throws IOException {
        this.in.close();
    }

    private synchronized void reopenStream() throws IOException {
        this.in.close();
        retrieveInputStreamWithInfo(this.nextReadPos);
        this.lastReadPos = this.nextReadPos;
    }

    private void retrieveInputStreamWithInfo(long j) throws IOException {
        Preconditions.checkNotNull(this.in, "Requires last InputStreamWithInfo");
        if (atEndOfStreamIfKnown(j)) {
            return;
        }
        try {
            open(this.in.getKey(), j);
        } catch (ConsistencyException e) {
            if (this.throwOnInconsistency) {
                throw e;
            }
            LOG.warn(e.getMessage(), e);
            throw new FileNotFoundException(e.getMessage());
        }
    }

    private boolean atEndOfStreamIfKnown() {
        return atEndOfStreamIfKnown(this.nextReadPos);
    }

    private boolean atEndOfStreamIfKnown(long j) {
        return this.in.atEndOfStreamIfKnown(j);
    }

    private void throwPositionOutOfBoundsException(long j) throws EOFException {
        throw new EOFException(String.format("Invalid position: %d, exceeds the bounds of the stream: [0, %d]", Long.valueOf(j), Long.valueOf(this.in.getContentLength())));
    }

    public synchronized void seek(long j) throws IOException {
        if (j < 0 || j > this.in.getContentLength()) {
            throwPositionOutOfBoundsException(j);
        }
        this.nextReadPos = j;
        if (this.lazySeek) {
            return;
        }
        seekStream();
    }

    synchronized void seekStream() throws IOException {
        if (this.lastReadPos != this.nextReadPos || this.in.wasClosedSuccessfully()) {
            reopenStream();
        }
    }

    public synchronized long getPos() throws IOException {
        return this.nextReadPos;
    }

    public boolean seekToNewSource(long j) throws IOException {
        return false;
    }

    public void unbuffer() {
        try {
            this.in.close();
        } catch (IOException e) {
            LOG.warn("Exception while trying to unbuffer input stream: ", e);
        }
    }
}
