package org.apache.flink.streaming.api.operators.sortpartition;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.PushSorter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorAttributes;
import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.MutableObjectIterator;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperator.class */
public class KeyedSortPartitionOperator<INPUT, KEY> extends AbstractStreamOperator<INPUT> implements OneInputStreamOperator<INPUT, INPUT>, BoundedOneInput {
    protected final TypeInformation<INPUT> inputType;
    protected final KeySelector<INPUT, ?> sortFieldSelector;
    private final Order sortOrder;
    private final String stringSortField;
    private final int positionSortField;
    private PushSorter<Tuple2<byte[], INPUT>> recordSorter = null;
    private PushSorter<Tuple2<byte[], Tuple2<?, INPUT>>> recordSorterForSelector = null;
    private TypeSerializer<KEY> recordKeySerializer;
    private DataOutputSerializer dataOutputSerializer;

    public KeyedSortPartitionOperator(TypeInformation<INPUT> typeInformation, int i, Order order) {
        this.inputType = typeInformation;
        ensureFieldSortable(i);
        this.positionSortField = i;
        this.stringSortField = null;
        this.sortFieldSelector = null;
        this.sortOrder = order;
    }

    public KeyedSortPartitionOperator(TypeInformation<INPUT> typeInformation, String str, Order order) {
        this.inputType = typeInformation;
        ensureFieldSortable(str);
        this.positionSortField = -1;
        this.stringSortField = str;
        this.sortFieldSelector = null;
        this.sortOrder = order;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <K> KeyedSortPartitionOperator(TypeInformation<INPUT> typeInformation, KeySelector<INPUT, K> keySelector, Order order) {
        this.inputType = typeInformation;
        ensureFieldSortable(keySelector);
        this.positionSortField = -1;
        this.stringSortField = null;
        this.sortFieldSelector = keySelector;
        this.sortOrder = order;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.SetupableStreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<INPUT>> output) {
        super.setup(streamTask, streamConfig, output);
        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
        ExecutionConfig executionConfig = streamTask.getEnvironment().getExecutionConfig();
        this.recordKeySerializer = streamConfig.getStateKeySerializer(userCodeClassLoader);
        int length = this.recordKeySerializer.getLength();
        createDataOutputSerializer(length);
        if (this.sortFieldSelector == null) {
            this.recordSorter = (PushSorter<Tuple2<byte[], INPUT>>) getSorter(new KeyAndValueSerializer(this.inputType.createSerializer(getExecutionConfig()), length), length > 0 ? new FixedLengthByteKeyAndValueComparator(length, ((CompositeType) this.inputType).createComparator(getSortFieldIndex(), getSortOrderIndicator(), 0, executionConfig)) : new VariableLengthByteKeyAndValueComparator(((CompositeType) this.inputType).createComparator(getSortFieldIndex(), getSortOrderIndicator(), 0, executionConfig)), streamTask);
        } else {
            TypeInformation TUPLE = Types.TUPLE((TypeInformation<?>[]) new TypeInformation[]{TypeExtractor.getKeySelectorTypes(this.sortFieldSelector, this.inputType), this.inputType});
            this.recordSorterForSelector = (PushSorter<Tuple2<byte[], Tuple2<?, INPUT>>>) getSorter(new KeyAndValueSerializer(TUPLE.createSerializer(getExecutionConfig()), length), length > 0 ? new FixedLengthByteKeyAndValueComparator(length, ((CompositeType) TUPLE).createComparator(getSortFieldIndex(), getSortOrderIndicator(), 0, executionConfig)) : new VariableLengthByteKeyAndValueComparator(((CompositeType) TUPLE).createComparator(getSortFieldIndex(), getSortOrderIndicator(), 0, executionConfig)), streamTask);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<INPUT> streamRecord) throws Exception {
        this.recordKeySerializer.serialize(getCurrentKey(), this.dataOutputSerializer);
        byte[] copyOfBuffer = this.dataOutputSerializer.getCopyOfBuffer();
        this.dataOutputSerializer.clear();
        if (this.sortFieldSelector != null) {
            this.recordSorterForSelector.writeRecord(Tuple2.of(copyOfBuffer, Tuple2.of(this.sortFieldSelector.getKey(streamRecord.getValue()), streamRecord.getValue())));
        } else {
            this.recordSorter.writeRecord(Tuple2.of(copyOfBuffer, streamRecord.getValue()));
        }
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() throws Exception {
        TimestampedCollector timestampedCollector = new TimestampedCollector(this.output);
        if (this.sortFieldSelector != null) {
            this.recordSorterForSelector.finishReading();
            MutableObjectIterator<Tuple2<byte[], Tuple2<?, INPUT>>> iterator = this.recordSorterForSelector.getIterator();
            Tuple2<byte[], Tuple2<?, INPUT>> next = iterator.next();
            while (true) {
                Tuple2<byte[], Tuple2<?, INPUT>> tuple2 = next;
                if (tuple2 == null) {
                    this.recordSorterForSelector.close();
                    return;
                } else {
                    timestampedCollector.collect(tuple2.f1.f1);
                    next = iterator.next();
                }
            }
        } else {
            this.recordSorter.finishReading();
            MutableObjectIterator<Tuple2<byte[], INPUT>> iterator2 = this.recordSorter.getIterator();
            Tuple2<byte[], INPUT> next2 = iterator2.next();
            while (true) {
                Tuple2<byte[], INPUT> tuple22 = next2;
                if (tuple22 == null) {
                    this.recordSorter.close();
                    return;
                } else {
                    timestampedCollector.collect(tuple22.f1);
                    next2 = iterator2.next();
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).setInternalSorterSupported(true).build();
    }

    private int[] getSortFieldIndex() {
        int[] iArr = new int[1];
        if (this.positionSortField != -1) {
            iArr[0] = new Keys.ExpressionKeys(this.positionSortField, this.inputType).computeLogicalKeyPositions()[0];
        } else if (this.stringSortField != null) {
            iArr[0] = new Keys.ExpressionKeys(this.stringSortField, this.inputType).computeLogicalKeyPositions()[0];
        }
        return iArr;
    }

    private boolean[] getSortOrderIndicator() {
        boolean[] zArr = new boolean[1];
        zArr[0] = this.sortOrder == Order.ASCENDING;
        return zArr;
    }

    private void ensureFieldSortable(int i) throws InvalidProgramException {
        if (!Keys.ExpressionKeys.isSortKey(i, (TypeInformation<?>) this.inputType)) {
            throw new InvalidProgramException("The field " + i + " of input type " + this.inputType + " is not sortable.");
        }
    }

    private void ensureFieldSortable(String str) throws InvalidProgramException {
        if (!Keys.ExpressionKeys.isSortKey(str, (TypeInformation<?>) this.inputType)) {
            throw new InvalidProgramException("The field " + str + " of input type " + this.inputType + " is not sortable.");
        }
    }

    private <K> void ensureFieldSortable(KeySelector<INPUT, K> keySelector) {
        TypeInformation keySelectorTypes = TypeExtractor.getKeySelectorTypes(keySelector, this.inputType);
        if (!new Keys.SelectorFunctionKeys(keySelector, this.inputType, keySelectorTypes).getKeyType().isSortKeyType()) {
            throw new InvalidProgramException("The key type " + keySelectorTypes + " is not sortable.");
        }
    }

    private void createDataOutputSerializer(int i) {
        if (i > 0) {
            this.dataOutputSerializer = new DataOutputSerializer(i);
        } else {
            this.dataOutputSerializer = new DataOutputSerializer(64);
        }
    }

    private <TYPE> PushSorter<TYPE> getSorter(TypeSerializer<TYPE> typeSerializer, TypeComparator<TYPE> typeComparator, StreamTask<?, ?> streamTask) {
        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
        Configuration jobConfiguration = streamTask.getEnvironment().getJobConfiguration();
        try {
            return ExternalSorter.newBuilder(streamTask.getEnvironment().getMemoryManager(), streamTask, typeSerializer, typeComparator, streamTask.getExecutionConfig()).memoryFraction(this.config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, streamTask.getEnvironment().getJobConfiguration(), streamTask.getEnvironment().getTaskConfiguration(), userCodeClassLoader)).enableSpilling(streamTask.getEnvironment().getIOManager(), ((Float) jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)).floatValue()).maxNumFileHandles(((Integer) jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN)).intValue()).objectReuse(streamTask.getExecutionConfig().isObjectReuseEnabled()).largeRecords(((Boolean) jobConfiguration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)).booleanValue()).build();
        } catch (MemoryAllocationException e) {
            throw new RuntimeException(e);
        }
    }
}
