/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.CachedShuffleDescriptors;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertexInputInfo;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.types.Either;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.FunctionWithException;

public class TaskDeploymentDescriptorFactory {
    private final ExecutionAttemptID executionId;
    private final TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
    private final TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> taskInfo;
    private final JobID jobID;
    private final PartitionLocationConstraint partitionDeploymentConstraint;
    private final List<ConsumedPartitionGroup> consumedPartitionGroups;
    private final Function<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitionRetriever;
    private final BlobWriter blobWriter;
    private final Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> consumedClusterPartitionShuffleDescriptors;
    private final Function<IntermediateDataSetID, ExecutionVertexInputInfo> executionVertexInputInfoRetriever;
    private final boolean nonFinishedHybridPartitionShouldBeUnknown;

    private TaskDeploymentDescriptorFactory(ExecutionAttemptID executionId, TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation, TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> taskInfo, JobID jobID, PartitionLocationConstraint partitionDeploymentConstraint, List<ConsumedPartitionGroup> consumedPartitionGroups, Function<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitionRetriever, BlobWriter blobWriter, boolean nonFinishedHybridPartitionShouldBeUnknown, Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> consumedClusterPartitionShuffleDescriptors, Function<IntermediateDataSetID, ExecutionVertexInputInfo> executionVertexInputInfoRetriever) {
        this.executionId = executionId;
        this.serializedJobInformation = serializedJobInformation;
        this.taskInfo = taskInfo;
        this.jobID = jobID;
        this.partitionDeploymentConstraint = partitionDeploymentConstraint;
        this.consumedPartitionGroups = consumedPartitionGroups;
        this.resultPartitionRetriever = resultPartitionRetriever;
        this.blobWriter = blobWriter;
        this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown;
        this.consumedClusterPartitionShuffleDescriptors = consumedClusterPartitionShuffleDescriptors;
        this.executionVertexInputInfoRetriever = (Function)Preconditions.checkNotNull(executionVertexInputInfoRetriever);
    }

    public TaskDeploymentDescriptor createDeploymentDescriptor(AllocationID allocationID, @Nullable JobManagerTaskRestore taskRestore, Collection<ResultPartitionDeploymentDescriptor> producedPartitions) throws IOException {
        return new TaskDeploymentDescriptor(this.jobID, this.serializedJobInformation, this.taskInfo, this.executionId, allocationID, taskRestore, new ArrayList<ResultPartitionDeploymentDescriptor>(producedPartitions), this.createInputGateDeploymentDescriptors());
    }

    private List<InputGateDeploymentDescriptor> createInputGateDeploymentDescriptors() throws IOException {
        ArrayList<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(this.consumedPartitionGroups.size());
        for (ConsumedPartitionGroup consumedPartitionGroup : this.consumedPartitionGroups) {
            IntermediateResult consumedIntermediateResult = this.resultPartitionRetriever.apply(consumedPartitionGroup.getFirst()).getIntermediateResult();
            IntermediateDataSetID resultId = consumedIntermediateResult.getId();
            ResultPartitionType partitionType = consumedIntermediateResult.getResultType();
            IndexRange subpartitionRange = this.executionVertexInputInfoRetriever.apply(resultId).getSubpartitionIndexRange();
            inputGates.add(new InputGateDeploymentDescriptor(resultId, partitionType, subpartitionRange, consumedPartitionGroup.size(), this.getConsumedPartitionShuffleDescriptors(consumedIntermediateResult, consumedPartitionGroup)));
        }
        for (Map.Entry entry : this.consumedClusterPartitionShuffleDescriptors.entrySet()) {
            inputGates.add(new InputGateDeploymentDescriptor((IntermediateDataSetID)entry.getKey(), ResultPartitionType.BLOCKING_PERSISTENT, 0, (ShuffleDescriptorAndIndex[])entry.getValue()));
        }
        return inputGates;
    }

    private List<TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorAndIndex[]>> getConsumedPartitionShuffleDescriptors(IntermediateResult intermediateResult, ConsumedPartitionGroup consumedPartitionGroup) throws IOException {
        CachedShuffleDescriptors cachedShuffleDescriptors = intermediateResult.getCachedShuffleDescriptors(consumedPartitionGroup);
        if (cachedShuffleDescriptors == null) {
            cachedShuffleDescriptors = intermediateResult.cacheShuffleDescriptors(consumedPartitionGroup, this.computeConsumedPartitionShuffleDescriptors(consumedPartitionGroup));
        }
        cachedShuffleDescriptors.serializeShuffleDescriptors((FunctionWithException<ShuffleDescriptorAndIndex[], TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorAndIndex[]>, IOException>)((FunctionWithException)this::serializeAndTryOffloadShuffleDescriptor));
        return cachedShuffleDescriptors.getAllSerializedShuffleDescriptors();
    }

    private ShuffleDescriptorAndIndex[] computeConsumedPartitionShuffleDescriptors(ConsumedPartitionGroup consumedPartitionGroup) {
        ShuffleDescriptorAndIndex[] shuffleDescriptors = new ShuffleDescriptorAndIndex[consumedPartitionGroup.size()];
        int i = 0;
        for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
            shuffleDescriptors[i] = new ShuffleDescriptorAndIndex(TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(this.resultPartitionRetriever.apply(partitionId), this.partitionDeploymentConstraint, this.nonFinishedHybridPartitionShouldBeUnknown), i);
            ++i;
        }
        return shuffleDescriptors;
    }

    private TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializeAndTryOffloadShuffleDescriptor(ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException {
        CompressedSerializedValue compressedSerializedValue = CompressedSerializedValue.fromObject((Object)shuffleDescriptors);
        Either serializedValueOrBlobKey = BlobWriter.tryOffload(compressedSerializedValue, this.jobID, this.blobWriter);
        if (serializedValueOrBlobKey.isLeft()) {
            return new TaskDeploymentDescriptor.NonOffloaded<ShuffleDescriptorAndIndex[]>((SerializedValue)serializedValueOrBlobKey.left());
        }
        return new TaskDeploymentDescriptor.Offloaded<ShuffleDescriptorAndIndex[]>((PermanentBlobKey)serializedValueOrBlobKey.right());
    }

    public static TaskDeploymentDescriptorFactory fromExecution(Execution execution) throws IOException, ClusterDatasetCorruptedException {
        Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> clusterPartitionShuffleDescriptors;
        ExecutionVertex executionVertex = execution.getVertex();
        InternalExecutionGraphAccessor internalExecutionGraphAccessor = executionVertex.getExecutionGraphAccessor();
        try {
            clusterPartitionShuffleDescriptors = TaskDeploymentDescriptorFactory.getClusterPartitionShuffleDescriptors(executionVertex);
        }
        catch (Throwable e) {
            throw new ClusterDatasetCorruptedException(e, executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume());
        }
        return new TaskDeploymentDescriptorFactory(execution.getAttemptId(), TaskDeploymentDescriptorFactory.getSerializedJobInformation(internalExecutionGraphAccessor), TaskDeploymentDescriptorFactory.getSerializedTaskInformation(executionVertex.getJobVertex().getTaskInformationOrBlobKey()), internalExecutionGraphAccessor.getJobID(), internalExecutionGraphAccessor.getPartitionLocationConstraint(), executionVertex.getAllConsumedPartitionGroups(), internalExecutionGraphAccessor::getResultPartitionOrThrow, internalExecutionGraphAccessor.getBlobWriter(), internalExecutionGraphAccessor.isNonFinishedHybridPartitionShouldBeUnknown(), clusterPartitionShuffleDescriptors, executionVertex::getExecutionVertexInputInfo);
    }

    private static Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> getClusterPartitionShuffleDescriptors(ExecutionVertex executionVertex) {
        InternalExecutionGraphAccessor internalExecutionGraphAccessor = executionVertex.getExecutionGraphAccessor();
        List<IntermediateDataSetID> consumedClusterDataSetIds = executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
        HashMap<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> clusterPartitionShuffleDescriptors = new HashMap<IntermediateDataSetID, ShuffleDescriptorAndIndex[]>();
        for (IntermediateDataSetID consumedClusterDataSetId : consumedClusterDataSetIds) {
            List<ShuffleDescriptor> shuffleDescriptors = internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(consumedClusterDataSetId);
            Preconditions.checkState((executionVertex.getTotalNumberOfParallelSubtasks() == shuffleDescriptors.size() ? 1 : 0) != 0, (String)"The parallelism (%s) of the cache consuming job vertex is different from the number of shuffle descriptors (%s) of the intermediate data set", (Object[])new Object[]{executionVertex.getTotalNumberOfParallelSubtasks(), shuffleDescriptors.size()});
            clusterPartitionShuffleDescriptors.put(consumedClusterDataSetId, new ShuffleDescriptorAndIndex[]{new ShuffleDescriptorAndIndex(shuffleDescriptors.get(executionVertex.getParallelSubtaskIndex()), 0)});
        }
        return clusterPartitionShuffleDescriptors;
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> getSerializedJobInformation(InternalExecutionGraphAccessor internalExecutionGraphAccessor) {
        Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey = internalExecutionGraphAccessor.getJobInformationOrBlobKey();
        if (jobInformationOrBlobKey.isLeft()) {
            return new TaskDeploymentDescriptor.NonOffloaded<JobInformation>((SerializedValue)jobInformationOrBlobKey.left());
        }
        return new TaskDeploymentDescriptor.Offloaded<JobInformation>((PermanentBlobKey)jobInformationOrBlobKey.right());
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> getSerializedTaskInformation(Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInfo) {
        return taskInfo.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded((SerializedValue)taskInfo.left()) : new TaskDeploymentDescriptor.Offloaded((PermanentBlobKey)taskInfo.right());
    }

    public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(IntermediateResultPartition consumedPartition, PartitionLocationConstraint partitionDeploymentConstraint, boolean nonFinishedHybridPartitionShouldBeUnknown) {
        Execution producer = consumedPartition.getProducer().getPartitionProducer();
        ExecutionState producerState = producer.getState();
        Optional<ResultPartitionDeploymentDescriptor> consumedPartitionDescriptor = producer.getResultPartitionDeploymentDescriptor(consumedPartition.getPartitionId());
        ResultPartitionID consumedPartitionId = new ResultPartitionID(consumedPartition.getPartitionId(), producer.getAttemptId());
        return TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(consumedPartitionId, consumedPartition.getResultType(), consumedPartition.hasDataAllProduced(), producerState, partitionDeploymentConstraint, consumedPartitionDescriptor.orElse(null), nonFinishedHybridPartitionShouldBeUnknown);
    }

    @VisibleForTesting
    static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(ResultPartitionID consumedPartitionId, ResultPartitionType resultPartitionType, boolean hasAllDataProduced, ExecutionState producerState, PartitionLocationConstraint partitionDeploymentConstraint, @Nullable ResultPartitionDeploymentDescriptor consumedPartitionDescriptor, boolean nonFinishedHybridPartitionShouldBeUnknown) {
        if ((resultPartitionType.canBePipelinedConsumed() || hasAllDataProduced) && consumedPartitionDescriptor != null && TaskDeploymentDescriptorFactory.isProducerAvailable(producerState)) {
            if (resultPartitionType.isHybridResultPartition() && nonFinishedHybridPartitionShouldBeUnknown && producerState != ExecutionState.FINISHED) {
                Preconditions.checkState((partitionDeploymentConstraint == PartitionLocationConstraint.CAN_BE_UNKNOWN ? 1 : 0) != 0, (Object)"partition location constraint should allow unknown shuffle descriptor when nonFinishedHybridPartitionShouldBeUnknown is true.");
                return new UnknownShuffleDescriptor(consumedPartitionId);
            }
            return consumedPartitionDescriptor.getShuffleDescriptor();
        }
        if (partitionDeploymentConstraint == PartitionLocationConstraint.CAN_BE_UNKNOWN) {
            return new UnknownShuffleDescriptor(consumedPartitionId);
        }
        throw TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(consumedPartitionId, resultPartitionType, hasAllDataProduced, producerState);
    }

    private static RuntimeException handleConsumedPartitionShuffleDescriptorErrors(ResultPartitionID consumedPartitionId, ResultPartitionType resultPartitionType, boolean hasAllDataProduced, ExecutionState producerState) {
        String msg = TaskDeploymentDescriptorFactory.isProducerFailedOrCanceled(producerState) ? "Trying to consume an input partition whose producer has been canceled or failed. The producer is in state " + (Object)((Object)producerState) + "." : String.format("Trying to consume an input partition whose producer is not ready (result type: %s, hasAllDataProduced: %s, producer state: %s, partition id: %s).", new Object[]{resultPartitionType, hasAllDataProduced, producerState, consumedPartitionId});
        return new IllegalStateException(msg);
    }

    private static boolean isProducerAvailable(ExecutionState producerState) {
        return producerState == ExecutionState.RUNNING || producerState == ExecutionState.INITIALIZING || producerState == ExecutionState.FINISHED || producerState == ExecutionState.SCHEDULED || producerState == ExecutionState.DEPLOYING;
    }

    private static boolean isProducerFailedOrCanceled(ExecutionState producerState) {
        return producerState == ExecutionState.CANCELING || producerState == ExecutionState.CANCELED || producerState == ExecutionState.FAILED;
    }

    public static class ShuffleDescriptorAndIndex
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final ShuffleDescriptor shuffleDescriptor;
        private final int index;

        public ShuffleDescriptorAndIndex(ShuffleDescriptor shuffleDescriptor, int index) {
            this.shuffleDescriptor = shuffleDescriptor;
            this.index = index;
        }

        public ShuffleDescriptor getShuffleDescriptor() {
            return this.shuffleDescriptor;
        }

        public int getIndex() {
            return this.index;
        }
    }

    public static enum PartitionLocationConstraint {
        MUST_BE_KNOWN,
        CAN_BE_UNKNOWN;


        public static PartitionLocationConstraint fromJobType(JobType jobType) {
            switch (jobType) {
                case BATCH: {
                    return CAN_BE_UNKNOWN;
                }
                case STREAMING: {
                    return MUST_BE_KNOWN;
                }
            }
            throw new IllegalArgumentException(String.format("Unknown JobType %s. Cannot derive partition location constraint for it.", new Object[]{jobType}));
        }
    }
}

