package org.apache.beam.runners.spark.structuredstreaming.translation;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.EvaluationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SideInputValues;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.reflect.ClassTag;

@Internal
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.class */
public abstract class PipelineTranslator {
    private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class);
    private static final int PLAN_COMPLEXITY_THRESHOLD = 6;

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator$BroadcastOptions.class */
    private static class BroadcastOptions implements Supplier<PipelineOptions>, Serializable {
        private final Broadcast<SerializablePipelineOptions> broadcast;

        private BroadcastOptions(SparkSession sparkSession, PipelineOptions pipelineOptions) {
            this.broadcast = PipelineTranslator.broadcast(sparkSession, new SerializablePipelineOptions(pipelineOptions));
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public PipelineOptions get() {
            return ((SerializablePipelineOptions) this.broadcast.value()).get();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator$DependencyVisitor.class */
    private class DependencyVisitor extends PTransformVisitor {
        private final Map<PCollection<?>, TranslationResult<?>> results;

        private DependencyVisitor() {
            super();
            this.results = new HashMap();
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator.PTransformVisitor
        <InT extends PInput, OutT extends POutput> void visit(TransformHierarchy.Node node, PTransform<InT, OutT> pTransform, TransformTranslator<InT, OutT, PTransform<InT, OutT>> transformTranslator) {
            ArrayList arrayList = new ArrayList(node.getInputs().size());
            Iterator it = node.getInputs().entrySet().iterator();
            while (it.hasNext()) {
                TranslationResult translationResult = (TranslationResult) Preconditions.checkStateNotNull(this.results.get(((Map.Entry) it.next()).getValue()));
                arrayList.add(translationResult);
                translationResult.dependentTransforms.add(pTransform);
            }
            for (PCollection<?> pCollection : node.getOutputs().values()) {
                this.results.put(pCollection, new TranslationResult<>(pCollection, transformTranslator.complexityFactor, arrayList));
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator$PTransformVisitor.class */
    private abstract class PTransformVisitor extends Pipeline.PipelineVisitor.Defaults {
        private PTransformVisitor() {
        }

        abstract <InT extends PInput, OutT extends POutput> void visit(TransformHierarchy.Node node, PTransform<InT, OutT> pTransform, TransformTranslator<InT, OutT, PTransform<InT, OutT>> transformTranslator);

        public final Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
            PTransform<PInput, POutput> transform = node.getTransform();
            TransformTranslator<PInput, POutput, PTransform<PInput, POutput>> supportedTranslator = getSupportedTranslator(transform);
            if (transform == null || supportedTranslator == null) {
                return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
            }
            visit(node, transform, supportedTranslator);
            return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
        }

        public final void visitPrimitiveTransform(TransformHierarchy.Node node) {
            PTransform<PInput, POutput> transform = node.getTransform();
            if (transform == null || transform.getClass().equals(View.CreatePCollectionView.class)) {
                return;
            }
            TransformTranslator<PInput, POutput, PTransform<PInput, POutput>> supportedTranslator = getSupportedTranslator(transform);
            if (supportedTranslator == null) {
                throw new UnsupportedOperationException("Transform " + PTransformTranslation.urnForTransform(transform) + " is not supported.");
            }
            visit(node, transform, supportedTranslator);
        }

        @Nullable
        private TransformTranslator<PInput, POutput, PTransform<PInput, POutput>> getSupportedTranslator(@Nullable PTransform<PInput, POutput> pTransform) {
            TransformTranslator<PInput, POutput, PTransform<PInput, POutput>> transformTranslator;
            if (pTransform == null || (transformTranslator = PipelineTranslator.this.getTransformTranslator(pTransform)) == null || !transformTranslator.canTranslate(pTransform)) {
                return null;
            }
            return transformTranslator;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator$StreamingModeDetector.class */
    private static class StreamingModeDetector extends Pipeline.PipelineVisitor.Defaults {
        private boolean streaming;

        StreamingModeDetector(boolean z) {
            this.streaming = z;
        }

        public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
            return this.streaming ? Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM : Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }

        public void visitValue(PValue pValue, TransformHierarchy.Node node) {
            if ((pValue instanceof PCollection) && ((PCollection) pValue).isBounded() == PCollection.IsBounded.UNBOUNDED) {
                PipelineTranslator.LOG.info("Found unbounded PCollection {}, switching to streaming mode.", pValue.getName());
                this.streaming = true;
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator$TranslatingVisitor.class */
    private class TranslatingVisitor extends PTransformVisitor implements TranslationState {
        private final Map<PCollection<?>, TranslationResult<?>> translationResults;
        private final Map<Coder<?>, Encoder<?>> encoders;
        private final SparkSession sparkSession;
        private final PipelineOptions options;
        private final Supplier<PipelineOptions> optionsSupplier;
        private final StorageLevel storageLevel;
        private final boolean isMemoryOnly;
        private final Set<TranslationResult<?>> leaves;

        public TranslatingVisitor(SparkSession sparkSession, SparkCommonPipelineOptions sparkCommonPipelineOptions, Map<PCollection<?>, TranslationResult<?>> map) {
            super();
            this.sparkSession = sparkSession;
            this.translationResults = map;
            this.options = sparkCommonPipelineOptions;
            this.optionsSupplier = new BroadcastOptions(sparkSession, sparkCommonPipelineOptions);
            this.storageLevel = StorageLevel.fromString(sparkCommonPipelineOptions.getStorageLevel());
            this.isMemoryOnly = this.storageLevel.equals(StorageLevel.MEMORY_ONLY());
            this.encoders = new HashMap();
            this.leaves = new HashSet();
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator.PTransformVisitor
        <InT extends PInput, OutT extends POutput> void visit(TransformHierarchy.Node node, PTransform<InT, OutT> pTransform, TransformTranslator<InT, OutT, PTransform<InT, OutT>> transformTranslator) {
            AppliedPTransform<InT, OutT, PTransform<InT, OutT>> appliedPTransform = node.toAppliedPTransform(getPipeline());
            try {
                PipelineTranslator.LOG.info("Translating {}: {}", node.isCompositeNode() ? "composite" : "primitive", node.getFullName());
                transformTranslator.translate(pTransform, appliedPTransform, this);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderProvider
        public <T> Encoder<T> encoderOf(Coder<T> coder, EncoderProvider.Factory<T> factory) {
            Encoder<?> encoder = this.encoders.get(coder);
            if (encoder == null) {
                encoder = factory.apply(coder);
                this.encoders.put(coder, encoder);
            }
            return (Encoder<T>) encoder;
        }

        private <T> TranslationResult<T> getResult(PCollection<T> pCollection) {
            return (TranslationResult) Preconditions.checkStateNotNull(this.translationResults.get(pCollection));
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator.TranslationState
        public <T> Dataset<WindowedValue<T>> getDataset(PCollection<T> pCollection) {
            return (Dataset) Preconditions.checkStateNotNull(((TranslationResult) getResult(pCollection)).dataset);
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator.TranslationState
        public <T> void putDataset(PCollection<T> pCollection, Dataset<WindowedValue<T>> dataset, boolean z) {
            TranslationResult<T> result = getResult(pCollection);
            ((TranslationResult) result).dataset = dataset;
            if (!z && this.isMemoryOnly) {
                result.resetPlanComplexity();
            } else if (z && result.usages() > 1) {
                if (this.isMemoryOnly) {
                    PipelineTranslator.LOG.info("Dataset {} will be cached in-memory as RDD for reuse.", ((TranslationResult) result).name);
                    ((TranslationResult) result).dataset = this.sparkSession.createDataset(dataset.rdd().persist(), dataset.encoder());
                    result.resetPlanComplexity();
                } else {
                    PipelineTranslator.LOG.info("Dataset {} will be cached for reuse.", ((TranslationResult) result).name);
                    dataset.persist(this.storageLevel);
                }
            }
            if (result.estimatePlanComplexity() > 6.0f) {
                PipelineTranslator.LOG.info("Breaking linage of dataset {} to limit complexity of query plan.", ((TranslationResult) result).name);
                ((TranslationResult) result).dataset = this.sparkSession.createDataset(dataset.rdd(), dataset.encoder());
                result.resetPlanComplexity();
            }
            if (result.isLeaf()) {
                this.leaves.add(result);
            }
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator.TranslationState
        public boolean isLeaf(PCollection<?> pCollection) {
            return getResult(pCollection).isLeaf();
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator.TranslationState
        public <T> Broadcast<SideInputValues<T>> getSideInputBroadcast(PCollection<T> pCollection, SideInputValues.Loader<T> loader) {
            TranslationResult<T> result = getResult(pCollection);
            if (((TranslationResult) result).sideInputBroadcast == null) {
                ((TranslationResult) result).sideInputBroadcast = PipelineTranslator.broadcast(this.sparkSession, (SideInputValues) loader.apply((Dataset) Preconditions.checkStateNotNull(((TranslationResult) result).dataset)));
            }
            return ((TranslationResult) result).sideInputBroadcast;
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator.TranslationState
        public Supplier<PipelineOptions> getOptionsSupplier() {
            return this.optionsSupplier;
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator.TranslationState
        public PipelineOptions getOptions() {
            return this.options;
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator.TranslationState
        public SparkSession getSparkSession() {
            return this.sparkSession;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator$TranslationResult.class */
    public static final class TranslationResult<T> implements EvaluationContext.NamedDataset<T> {
        private final String name;
        private final float complexityFactor;
        private float planComplexity;
        private Dataset<WindowedValue<T>> dataset;
        private Broadcast<SideInputValues<T>> sideInputBroadcast;
        private final Set<PTransform<?, ?>> dependentTransforms;
        private final List<TranslationResult<?>> dependencies;

        private TranslationResult(PCollection<?> pCollection, float f, List<TranslationResult<?>> list) {
            this.planComplexity = 0.0f;
            this.dataset = null;
            this.sideInputBroadcast = null;
            this.dependentTransforms = new HashSet();
            this.name = pCollection.getName();
            this.complexityFactor = f;
            this.dependencies = list;
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.EvaluationContext.NamedDataset
        public String name() {
            return this.name;
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.EvaluationContext.NamedDataset
        @Nullable
        public Dataset<WindowedValue<T>> dataset() {
            return this.dataset;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isLeaf() {
            return this.dependentTransforms.isEmpty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int usages() {
            return this.dependentTransforms.size();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resetPlanComplexity() {
            this.planComplexity = 1.0f;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public float estimatePlanComplexity() {
            if (this.planComplexity > 0.0f) {
                return this.planComplexity;
            }
            float f = 1.0f + this.complexityFactor;
            Iterator<TranslationResult<?>> it = this.dependencies.iterator();
            while (it.hasNext()) {
                f *= it.next().estimatePlanComplexity();
            }
            float f2 = f;
            this.planComplexity = f2;
            return f2;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator$TranslationState.class */
    public interface TranslationState extends EncoderProvider {
        <T> Dataset<WindowedValue<T>> getDataset(PCollection<T> pCollection);

        boolean isLeaf(PCollection<?> pCollection);

        <T> void putDataset(PCollection<T> pCollection, Dataset<WindowedValue<T>> dataset, boolean z);

        default <T> void putDataset(PCollection<T> pCollection, Dataset<WindowedValue<T>> dataset) {
            putDataset(pCollection, dataset, true);
        }

        <T> Broadcast<SideInputValues<T>> getSideInputBroadcast(PCollection<T> pCollection, SideInputValues.Loader<T> loader);

        Supplier<PipelineOptions> getOptionsSupplier();

        PipelineOptions getOptions();

        SparkSession getSparkSession();
    }

    public static void replaceTransforms(Pipeline pipeline, StreamingOptions streamingOptions) {
        pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(streamingOptions.isStreaming()));
    }

    public static void detectStreamingMode(Pipeline pipeline, StreamingOptions streamingOptions) {
        StreamingModeDetector streamingModeDetector = new StreamingModeDetector(streamingOptions.isStreaming());
        pipeline.traverseTopologically(streamingModeDetector);
        streamingOptions.setStreaming(streamingModeDetector.streaming);
    }

    @Nullable
    protected abstract <InT extends PInput, OutT extends POutput, TransformT extends PTransform<InT, OutT>> TransformTranslator<InT, OutT, TransformT> getTransformTranslator(TransformT transformt);

    public EvaluationContext translate(Pipeline pipeline, SparkSession sparkSession, SparkCommonPipelineOptions sparkCommonPipelineOptions) {
        LOG.debug("starting translation of the pipeline using {}", getClass().getName());
        DependencyVisitor dependencyVisitor = new DependencyVisitor();
        pipeline.traverseTopologically(dependencyVisitor);
        TranslatingVisitor translatingVisitor = new TranslatingVisitor(sparkSession, sparkCommonPipelineOptions, dependencyVisitor.results);
        pipeline.traverseTopologically(translatingVisitor);
        return new EvaluationContext(translatingVisitor.leaves, sparkSession);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Broadcast<T> broadcast(SparkSession sparkSession, T t) {
        return sparkSession.sparkContext().broadcast(t, ClassTag.AnyRef());
    }
}
