package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.class */
class ShardConsumer {
    private static final Log LOG = LogFactory.getLog(ShardConsumer.class);
    private final StreamConfig streamConfig;
    private final IRecordProcessor recordProcessor;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final ExecutorService executorService;
    private final ShardInfo shardInfo;
    private final KinesisDataFetcher dataFetcher;
    private final IMetricsFactory metricsFactory;
    private final ILeaseManager<KinesisClientLease> leaseManager;
    private ICheckpoint checkpoint;
    private final long parentShardPollIntervalMillis;
    private final boolean cleanupLeasesOfCompletedShards;
    private final long taskBackoffTimeMillis;
    private ITask currentTask;
    private long currentTaskSubmitTime;
    private Future<TaskResult> future;
    private ShardConsumerState currentState = ShardConsumerState.WAITING_ON_PARENT_SHARDS;
    private boolean beginShutdown;
    private ShutdownReason shutdownReason;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer$ShardConsumerState.class */
    public enum ShardConsumerState {
        WAITING_ON_PARENT_SHARDS,
        INITIALIZING,
        PROCESSING,
        SHUTTING_DOWN,
        SHUTDOWN_COMPLETE
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint iCheckpoint, IRecordProcessor iRecordProcessor, ILeaseManager<KinesisClientLease> iLeaseManager, long j, boolean z, ExecutorService executorService, IMetricsFactory iMetricsFactory, long j2) {
        this.streamConfig = streamConfig;
        this.recordProcessor = iRecordProcessor;
        this.executorService = executorService;
        this.shardInfo = shardInfo;
        this.checkpoint = iCheckpoint;
        this.recordProcessorCheckpointer = new RecordProcessorCheckpointer(shardInfo, iCheckpoint, new SequenceNumberValidator(streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), new CheckpointValueComparator());
        this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
        this.leaseManager = iLeaseManager;
        this.metricsFactory = iMetricsFactory;
        this.parentShardPollIntervalMillis = j;
        this.cleanupLeasesOfCompletedShards = z;
        this.taskBackoffTimeMillis = j2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean consumeShard() {
        return checkAndSubmitNextTask();
    }

    private synchronized boolean checkAndSubmitNextTask() {
        boolean z = false;
        boolean z2 = false;
        if (this.future == null || this.future.isCancelled() || this.future.isDone()) {
            if (this.future != null && this.future.isDone()) {
                try {
                    TaskResult taskResult = this.future.get();
                    if (taskResult.getException() == null) {
                        z = true;
                        if (taskResult.isShardEndReached()) {
                            markForShutdown(ShutdownReason.TERMINATE);
                        }
                    } else if (LOG.isDebugEnabled()) {
                        if (taskResult.getException() instanceof BlockedOnParentShardException) {
                            LOG.debug("Shard " + this.shardInfo.getShardId() + " is blocked on completion of parent shard.");
                        } else {
                            LOG.debug("Caught exception running " + this.currentTask.getTaskType() + " task: ", taskResult.getException());
                        }
                    }
                } catch (InterruptedException e) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(this.currentTask.getTaskType() + " task was interrupted: ", e);
                    }
                } catch (ExecutionException e2) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(this.currentTask.getTaskType() + " task encountered execution exception: ", e2);
                    }
                }
            }
            updateState(z);
            ITask nextTask = getNextTask();
            if (nextTask != null) {
                this.currentTask = nextTask;
                this.future = this.executorService.submit(this.currentTask);
                this.currentTaskSubmitTime = System.currentTimeMillis();
                z2 = true;
                LOG.debug("Submitted new " + this.currentTask.getTaskType() + " task for shard " + this.shardInfo.getShardId());
            } else if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("No new task to submit for shard %s, currentState %s", this.shardInfo.getShardId(), this.currentState.toString()));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Previous " + this.currentTask.getTaskType() + " task still pending for shard " + this.shardInfo.getShardId() + " since " + (System.currentTimeMillis() - this.currentTaskSubmitTime) + " ms ago.  Not submitting new task.");
        }
        return z2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean beginShutdown() {
        if (this.currentState != ShardConsumerState.SHUTDOWN_COMPLETE) {
            markForShutdown(ShutdownReason.ZOMBIE);
            checkAndSubmitNextTask();
        }
        return isShutdown();
    }

    synchronized void markForShutdown(ShutdownReason shutdownReason) {
        this.beginShutdown = true;
        if (this.shutdownReason == null || this.shutdownReason == ShutdownReason.TERMINATE) {
            this.shutdownReason = shutdownReason;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isShutdown() {
        return this.currentState == ShardConsumerState.SHUTDOWN_COMPLETE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShutdownReason getShutdownReason() {
        return this.shutdownReason;
    }

    private ITask getNextTask() {
        ITask iTask = null;
        switch (this.currentState) {
            case WAITING_ON_PARENT_SHARDS:
                iTask = new BlockOnParentShardTask(this.shardInfo, this.leaseManager, this.parentShardPollIntervalMillis);
                break;
            case INITIALIZING:
                iTask = new InitializeTask(this.shardInfo, this.recordProcessor, this.checkpoint, this.recordProcessorCheckpointer, this.dataFetcher, this.taskBackoffTimeMillis);
                break;
            case PROCESSING:
                iTask = new ProcessTask(this.shardInfo, this.streamConfig, this.recordProcessor, this.recordProcessorCheckpointer, this.dataFetcher, this.taskBackoffTimeMillis);
                break;
            case SHUTTING_DOWN:
                iTask = new ShutdownTask(this.shardInfo, this.recordProcessor, this.recordProcessorCheckpointer, this.shutdownReason, this.streamConfig.getStreamProxy(), this.streamConfig.getInitialPositionInStream(), this.cleanupLeasesOfCompletedShards, this.leaseManager, this.taskBackoffTimeMillis);
                break;
        }
        if (iTask == null) {
            return null;
        }
        return new MetricsCollectingTaskDecorator(iTask, this.metricsFactory);
    }

    void updateState(boolean z) {
        switch (this.currentState) {
            case WAITING_ON_PARENT_SHARDS:
                if (z && TaskType.BLOCK_ON_PARENT_SHARDS.equals(this.currentTask.getTaskType())) {
                    if (this.beginShutdown) {
                        this.currentState = ShardConsumerState.SHUTTING_DOWN;
                        return;
                    } else {
                        this.currentState = ShardConsumerState.INITIALIZING;
                        return;
                    }
                }
                if (this.currentTask == null && this.beginShutdown) {
                    this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                    return;
                }
                return;
            case INITIALIZING:
                if (z && TaskType.INITIALIZE.equals(this.currentTask.getTaskType())) {
                    if (this.beginShutdown) {
                        this.currentState = ShardConsumerState.SHUTTING_DOWN;
                        return;
                    } else {
                        this.currentState = ShardConsumerState.PROCESSING;
                        return;
                    }
                }
                if (this.currentTask == null && this.beginShutdown) {
                    this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                    return;
                }
                return;
            case PROCESSING:
                if (z && TaskType.PROCESS.equals(this.currentTask.getTaskType())) {
                    if (this.beginShutdown) {
                        this.currentState = ShardConsumerState.SHUTTING_DOWN;
                        return;
                    } else {
                        this.currentState = ShardConsumerState.PROCESSING;
                        return;
                    }
                }
                return;
            case SHUTTING_DOWN:
                if (this.currentTask == null || (z && TaskType.SHUTDOWN.equals(this.currentTask.getTaskType()))) {
                    this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                    return;
                }
                return;
            case SHUTDOWN_COMPLETE:
                return;
            default:
                LOG.error("Unexpected state: " + this.currentState);
                return;
        }
    }

    ShardConsumerState getCurrentState() {
        return this.currentState;
    }
}
