/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.multilang;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.MessageReader;
import com.amazonaws.services.kinesis.multilang.MessageWriter;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

class MultiLangProtocol {
    private static final Log LOG = LogFactory.getLog(MultiLangProtocol.class);
    private MessageReader messageReader;
    private MessageWriter messageWriter;
    private String shardId;

    MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, String shardId) {
        this.messageReader = messageReader;
        this.messageWriter = messageWriter;
        this.shardId = shardId;
    }

    boolean initialize() {
        Future<Boolean> writeFuture = this.messageWriter.writeInitializeMessage(this.shardId);
        return this.waitForStatusMessage("initialize", null, writeFuture);
    }

    boolean processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        Future<Boolean> writeFuture = this.messageWriter.writeProcessRecordsMessage(records);
        return this.waitForStatusMessage("processRecords", checkpointer, writeFuture);
    }

    boolean shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        Future<Boolean> writeFuture = this.messageWriter.writeShutdownMessage(reason);
        return this.waitForStatusMessage("shutdown", checkpointer, writeFuture);
    }

    private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer, Future<Boolean> writeFuture) {
        boolean statusWasCorrect = this.waitForStatusMessage(action, checkpointer);
        try {
            boolean writerIsStillOpen = writeFuture.get();
            return statusWasCorrect && writerIsStillOpen;
        }
        catch (InterruptedException e) {
            LOG.error(String.format("Interrupted while writing %s message for shard %s", action, this.shardId));
            return false;
        }
        catch (ExecutionException e) {
            LOG.error(String.format("Failed to write %s message for shard %s", action, this.shardId), e);
            return false;
        }
    }

    private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
        StatusMessage statusMessage = null;
        while (statusMessage == null) {
            Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
            try {
                Message message = future.get();
                if (message instanceof CheckpointMessage) {
                    boolean checkpointWriteSucceeded = this.checkpoint((CheckpointMessage)message, checkpointer).get();
                    if (checkpointWriteSucceeded) continue;
                    return false;
                }
                if (!(message instanceof StatusMessage)) continue;
                statusMessage = (StatusMessage)message;
            }
            catch (InterruptedException e) {
                LOG.error(String.format("Interrupted while waiting for %s message for shard %s", action, this.shardId));
                return false;
            }
            catch (ExecutionException e) {
                LOG.error(String.format("Failed to get status message for %s action for shard %s", action, this.shardId), e);
                return false;
            }
        }
        return this.validateStatusMessage(statusMessage, action);
    }

    private boolean validateStatusMessage(StatusMessage statusMessage, String action) {
        LOG.info("Received response " + statusMessage + " from subprocess while waiting for " + action + " while processing shard " + this.shardId);
        return statusMessage != null && statusMessage.getResponseFor() != null && statusMessage.getResponseFor().equals(action);
    }

    private Future<Boolean> checkpoint(CheckpointMessage checkpointMessage, IRecordProcessorCheckpointer checkpointer) {
        String sequenceNumber = checkpointMessage.getCheckpoint();
        try {
            if (checkpointer != null) {
                if (sequenceNumber == null) {
                    LOG.info(String.format("Attempting to checkpoint for shard %s", this.shardId));
                    checkpointer.checkpoint();
                } else {
                    LOG.info(String.format("Attempting to checkpoint at sequence number %s for shard %s", sequenceNumber, this.shardId));
                    checkpointer.checkpoint(sequenceNumber);
                }
                return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, null);
            }
            String message = String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s", sequenceNumber, this.shardId);
            LOG.error(message);
            return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, new InvalidStateException(message));
        }
        catch (Throwable t) {
            return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, t);
        }
    }
}

