package com.amazon.ws.emr.hadoop.fs.s3;

import com.amazon.ws.emr.hadoop.fs.EmrFsStore;
import com.amazon.ws.emr.hadoop.fs.consistency.ItemKeys;
import com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException;
import com.amazon.ws.emr.hadoop.fs.dynamodb.Entity;
import com.amazon.ws.emr.hadoop.fs.dynamodb.EntityStore;
import com.amazon.ws.emr.hadoop.fs.retry.BackoffStrategies;
import com.amazon.ws.emr.hadoop.fs.retry.BackoffStrategy;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonServiceException;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.annotations.VisibleForTesting;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.base.Preconditions;
import com.amazon.ws.emr.hadoop.fs.util.ConfigurationUtils;
import com.amazon.ws.emr.hadoop.fs.util.EmrFsUtils;
import com.amazon.ws.emr.hadoop.fs.util.S3UriUtils;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/amazon/ws/emr/hadoop/fs/s3/S3FSInputStream.class */
public class S3FSInputStream extends AbstractS3FSInputStream implements CanUnbuffer {
    private static final Logger logger = LoggerFactory.getLogger(S3FSInputStream.class);

    @VisibleForTesting
    public static final int NUM_READ_RETRIES = 5;
    private final String bucketName;
    private final String key;
    private final ContentLengthSupplier contentLengthSupplier;
    private final InputStreamWithInfoFactory inputStreamWithInfoFactory;

    @Nullable
    private final FileSystem.Statistics statistics;

    @Nullable
    private final EntityStore entityStore;
    private final boolean throwOnInconsistency;
    private final boolean lazySeek;
    private final BackoffStrategy backoffStrategy;
    private final AtomicBoolean shouldTryInitialTimeout;
    private InputStreamWithInfo in;
    private boolean wasLazilyOpened;
    private long lastReadPos;
    private long nextReadPos;

    @Nullable
    private Long maxLength;
    private volatile long contentLength;

    public S3FSInputStream(@NonNull String str, @NonNull String str2, @NonNull ContentLengthSupplier contentLengthSupplier, @NonNull InputStreamWithInfoFactory inputStreamWithInfoFactory, @NonNull Configuration configuration, @Nullable FileSystem.Statistics statistics, @Nullable EntityStore entityStore, boolean z, boolean z2, long j) throws IOException {
        this(str, str2, contentLengthSupplier, inputStreamWithInfoFactory, statistics, entityStore, 0L, j, null, z, z2, ConfigurationUtils.isLazySeekEnabled(configuration), ConfigurationUtils.isPositionedReadOptimizationEnabled(configuration), ConfigurationUtils.isReadFullyIntoBuffersOptimizationEnabled(configuration), BackoffStrategies.get(configuration), new AtomicBoolean(true));
        if (str == null) {
            throw new NullPointerException("bucketName is marked non-null but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("key is marked non-null but is null");
        }
        if (contentLengthSupplier == null) {
            throw new NullPointerException("contentLengthSupplier is marked non-null but is null");
        }
        if (inputStreamWithInfoFactory == null) {
            throw new NullPointerException("inputStreamWithInfoFactory is marked non-null but is null");
        }
        if (configuration == null) {
            throw new NullPointerException("conf is marked non-null but is null");
        }
    }

    private S3FSInputStream(String str, String str2, ContentLengthSupplier contentLengthSupplier, InputStreamWithInfoFactory inputStreamWithInfoFactory, FileSystem.Statistics statistics, EntityStore entityStore, long j, long j2, @Nullable Long l, boolean z, boolean z2, boolean z3, boolean z4, boolean z5, BackoffStrategy backoffStrategy, AtomicBoolean atomicBoolean) throws IOException {
        super(str, str2, inputStreamWithInfoFactory.supportsMaxLength() && z4, inputStreamWithInfoFactory.supportsMaxLength() && z5 && z3);
        this.lastReadPos = 0L;
        this.nextReadPos = 0L;
        Preconditions.checkArgument((entityStore == null && z) ? false : true, "Entity store is required when setting throwOnInconsistency to true");
        Preconditions.checkArgument(l == null || inputStreamWithInfoFactory.supportsMaxLength(), "Max length (%d) is given, but the given input stream factory does not support it", l);
        this.bucketName = str;
        this.key = str2;
        this.contentLengthSupplier = contentLengthSupplier;
        this.inputStreamWithInfoFactory = inputStreamWithInfoFactory;
        this.statistics = statistics;
        this.entityStore = entityStore;
        this.contentLength = j2;
        this.maxLength = l;
        this.throwOnInconsistency = z;
        this.lazySeek = z3;
        this.backoffStrategy = backoffStrategy;
        this.shouldTryInitialTimeout = atomicBoolean;
        open(j, z2);
    }

    private void open(long j, boolean z) throws IOException {
        Preconditions.checkArgument(j >= 0, "Cannot seek to a negative position");
        try {
            long j2 = this.in == null || this.wasLazilyOpened ? this.contentLength : this.contentLengthSupplier.get(this.bucketName, this.key);
            logger.debug("Stream for key '{}' seeking to position '{}'", this.key, Long.valueOf(j));
            if (z) {
                this.in = this.inputStreamWithInfoFactory.createClosedStream(this.bucketName, this.key, j2);
            } else {
                this.in = this.inputStreamWithInfoFactory.create(this.bucketName, this.key, j, j2, this.maxLength, this.shouldTryInitialTimeout.get());
            }
            this.wasLazilyOpened = z;
            this.contentLength = this.in.getContentLength();
            this.lastReadPos = j;
            this.nextReadPos = j;
        } catch (AmazonServiceException e) {
            if (e.getStatusCode() != 404) {
                throw new IOException(e);
            }
            if (this.entityStore == null) {
                throw new FileNotFoundException(String.format("File '%s/%s' does not exist in S3", this.bucketName, this.key));
            }
            Entity retrieve = this.entityStore.retrieve(ItemKeys.toItemKey(this.bucketName, this.key));
            if (retrieve != null && EmrFsStore.MetadataFile.parseFrom(retrieve.getPayload()).getState() != EmrFsStore.MetadataFile.State.DELETED) {
                throw new ConsistencyException(String.format("Unable to get object '%s/%s' from s3", this.bucketName, this.key), e, Collections.singletonList(S3UriUtils.getPathForS3Object(this.bucketName, this.key)));
            }
            throw new FileNotFoundException(String.format("File '%s/%s' has been deleted in both metadata and s3", this.bucketName, this.key));
        }
    }

    public synchronized int read() throws IOException {
        throw new UnsupportedOperationException("Single byte read() not implemented");
    }

    public synchronized int read(@NonNull byte[] bArr, int i, int i2) throws IOException {
        if (bArr == null) {
            throw new NullPointerException("bytes is marked non-null but is null");
        }
        if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
            throw new IndexOutOfBoundsException();
        }
        if (i2 == 0) {
            return 0;
        }
        if (atEndOfStreamIfKnown()) {
            return -1;
        }
        int i3 = -1;
        Exception exc = null;
        for (int i4 = 0; i4 < 5; i4++) {
            if (i4 <= 0) {
                try {
                    try {
                        if (this.lazySeek) {
                            seekStream();
                        } else {
                            ensureStreamNotClosed();
                        }
                    } catch (FileNotFoundException e) {
                        logger.info("Encountered an exception while reading '{}', file not present", this.in.getKey(), e);
                        throw new FileNotFoundException("File not present on S3");
                    }
                } catch (AmazonClientException | IOException e2) {
                    this.shouldTryInitialTimeout.set(false);
                    exc = e2;
                    if (i4 >= 4) {
                        logger.info("Encountered exception while reading '{}', max retries exceeded.", this.in.getKey(), e2);
                    } else {
                        logger.info("Encountered exception while reading '{}', will retry by attempting to reopen stream.", this.in.getKey(), e2);
                        long retryInterval = getRetryInterval(e2, i4);
                        logger.debug("Back off {} ms for retrying open stream while reading due to s3 GET-After-PUT consistency issue OR another IO related exception such as SocketReset. For best practice please see https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel", Long.valueOf(retryInterval));
                        EmrFsUtils.sleep(retryInterval);
                    }
                }
            } else {
                reopenStream();
            }
            i3 = this.in.read(bArr, i, i2);
            if (i3 > 0) {
                advance(i3);
                break;
            }
            if (this.in.shouldBreakReadRetry(this.nextReadPos)) {
                break;
            }
            logger.warn(generateUnexpectedEndOfStreamMsg());
            if (i4 >= 4) {
                logger.error("Unable to recover reading from stream");
                throw new IOException(generateUnexpectedEndOfStreamMsg(), exc);
            }
        }
        return i3;
    }

    private void advance(int i) {
        if (this.maxLength != null) {
            Preconditions.checkArgument(((long) i) <= this.maxLength.longValue(), "Cannot advance beyond maxLength");
            this.maxLength = Long.valueOf(this.maxLength.longValue() - i);
        }
        this.lastReadPos += i;
        this.nextReadPos += i;
        if (this.statistics != null) {
            this.statistics.incrementBytesRead(i);
        }
    }

    @VisibleForTesting
    long getRetryInterval(Exception exc, int i) {
        return this.backoffStrategy.getBackoffMillis(exc, i);
    }

    @Override // com.amazon.ws.emr.hadoop.fs.s3.AbstractS3FSInputStream
    protected InputStream forkStream(long j, long j2) throws IOException {
        long j3 = this.contentLength;
        return (j2 == 0 || j >= j3) ? new ByteArrayInputStream(new byte[0]) : new S3FSInputStream(this.bucketName, this.key, this.contentLengthSupplier, this.inputStreamWithInfoFactory, this.statistics, this.entityStore, j, j3, Long.valueOf(j2), this.throwOnInconsistency, true, this.lazySeek, false, false, this.backoffStrategy, this.shouldTryInitialTimeout);
    }

    private String generateUnexpectedEndOfStreamMsg() {
        StringBuilder sb = new StringBuilder("Unexpected end of stream pos=" + this.lastReadPos);
        if (this.in.isSelect()) {
            sb.append(", byteScanned=" + this.in.getSelectByteScanned());
        }
        sb.append(", contentLength=" + this.in.getContentLength());
        return sb.toString();
    }

    private void ensureStreamNotClosed() throws IOException {
        if (this.in.wasClosedSuccessfully()) {
            reopenStream();
        }
    }

    public void close() throws IOException {
        this.in.close();
    }

    private synchronized void reopenStream() throws IOException {
        this.in.close();
        retrieveInputStreamWithInfo(this.nextReadPos);
        this.lastReadPos = this.nextReadPos;
    }

    private void retrieveInputStreamWithInfo(long j) throws IOException {
        Preconditions.checkNotNull(this.in, "Requires last InputStreamWithInfo");
        if (j > this.in.getContentLength()) {
            throwPositionOutOfBoundsException(j);
        }
        if (atEndOfStreamIfKnown(j)) {
            return;
        }
        try {
            open(j, false);
        } catch (ConsistencyException e) {
            if (this.throwOnInconsistency) {
                throw e;
            }
            logger.warn(e.getMessage(), e);
            throw new FileNotFoundException(e.getMessage());
        }
    }

    private boolean atEndOfStreamIfKnown() {
        return atEndOfStreamIfKnown(this.nextReadPos);
    }

    private boolean atEndOfStreamIfKnown(long j) {
        return this.in.atEndOfStreamIfKnown(j);
    }

    private void throwPositionOutOfBoundsException(long j) throws EOFException {
        throw new EOFException(String.format("Invalid position: %d, exceeds the bounds of the stream: [0, %d]", Long.valueOf(j), Long.valueOf(this.in.getContentLength())));
    }

    public synchronized void seek(long j) throws IOException {
        if (this.maxLength != null) {
            throw new UnsupportedOperationException("Seeking is not supported when maxLength is specified");
        }
        if (j < 0 || j > this.in.getContentLength()) {
            throwPositionOutOfBoundsException(j);
        }
        this.nextReadPos = j;
        if (this.lazySeek) {
            return;
        }
        seekStream();
    }

    private synchronized void seekStream() throws IOException {
        if (this.lastReadPos != this.nextReadPos || this.in.wasClosedSuccessfully()) {
            reopenStream();
        }
    }

    public synchronized long getPos() throws IOException {
        return this.nextReadPos;
    }

    public boolean seekToNewSource(long j) throws IOException {
        return false;
    }

    public void unbuffer() {
        try {
            this.in.close();
        } catch (IOException e) {
            logger.warn("Exception while trying to unbuffer input stream: ", e);
        }
    }
}
