package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.Planner;
import java.util.List;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/pipeline/transform/AggregateTransform.class */
public class AggregateTransform<A, R> extends AbstractTransform {
    public static final String FIRST_STAGE_VERTEX_NAME_SUFFIX = "-prepare";
    private static final long serialVersionUID = 1;

    @Nonnull
    private final AggregateOperation<A, ? extends R> aggrOp;

    public AggregateTransform(@Nonnull List<Transform> list, @Nonnull AggregateOperation<A, ? extends R> aggregateOperation) {
        super(createName(list), list);
        this.aggrOp = aggregateOperation;
    }

    private static String createName(@Nonnull List<Transform> list) {
        return list.size() == 1 ? "aggregate" : list.size() + "-way co-aggregate";
    }

    @Override // com.hazelcast.jet.impl.pipeline.transform.Transform
    public void addToDag(Planner planner, PipelineImpl.Context context) {
        if (this.aggrOp.combineFn() == null) {
            addToDagSingleStage(planner);
        } else {
            addToDagTwoStage(planner, context);
        }
    }

    private void addToDagSingleStage(Planner planner) {
        determinedLocalParallelism(1);
        planner.addEdges(this, planner.addVertex(this, name(), determinedLocalParallelism(), Processors.aggregateP(this.aggrOp)).v, edge -> {
            edge.distributed().allToOne(Integer.valueOf(name().hashCode()));
        });
    }

    private void addToDagTwoStage(Planner planner, PipelineImpl.Context context) {
        String name = name();
        determineLocalParallelism(-1, context, planner.isPreserveOrder());
        Vertex localParallelism = planner.dag.newVertex(name + "-prepare", Processors.accumulateP(this.aggrOp)).localParallelism(determinedLocalParallelism());
        if (planner.isPreserveOrder()) {
            planner.addEdges(this, localParallelism, (v0) -> {
                v0.isolated();
            });
        } else {
            planner.addEdges(this, localParallelism);
        }
        determinedLocalParallelism(1);
        planner.dag.edge(Edge.between(localParallelism, planner.addVertex(this, name, determinedLocalParallelism(), ProcessorMetaSupplier.forceTotalParallelismOne(ProcessorSupplier.of(Processors.combineP(this.aggrOp)), name)).v).distributed().allToOne(name));
    }
}
