package org.talend.sdk.component.runtime.di.beam.components;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.runtime.di.JobStateAware;
import org.talend.sdk.component.runtime.di.beam.DelegatingBoundedSource;
import org.talend.sdk.component.runtime.di.beam.DelegatingUnBoundedSource;
import org.talend.sdk.component.runtime.di.beam.InMemoryQueueIO;
import org.talend.sdk.component.runtime.di.beam.SettableSourceListener;
import org.talend.sdk.component.runtime.di.beam.SourceListener;

/* loaded from: input_file:org/talend/sdk/component/runtime/di/beam/components/DIPipeline.class */
public class DIPipeline extends Pipeline {
    private final Collection<PTransform<?, ?>> transformStack;
    private final Map<PTransform<?, ?>, JobStateAware.State> states;
    private String currentState;

    public DIPipeline(PipelineOptions pipelineOptions) {
        super(pipelineOptions);
        this.transformStack = new ArrayList();
        this.states = new ConcurrentHashMap();
    }

    public <OutputT extends POutput> OutputT apply(PTransform<? super PBegin, OutputT> pTransform) {
        this.transformStack.add(pTransform);
        try {
            return (OutputT) super.apply(wrapTransformIfNeeded(pTransform));
        } finally {
            this.transformStack.remove(pTransform);
        }
    }

    public <OutputT extends POutput> OutputT apply(String str, PTransform<? super PBegin, OutputT> pTransform) {
        this.transformStack.add(pTransform);
        try {
            OutputT outputt = (OutputT) super.apply(str, wrapTransformIfNeeded(pTransform));
            this.transformStack.remove(pTransform);
            return outputt;
        } catch (Throwable th) {
            this.transformStack.remove(pTransform);
            throw th;
        }
    }

    private <PT extends POutput> PTransform<? super PBegin, PT> wrapTransformIfNeeded(PTransform<? super PBegin, PT> pTransform) {
        if (Read.Bounded.class.isInstance(pTransform)) {
            DelegatingBoundedSource delegatingBoundedSource = new DelegatingBoundedSource(((Read.Bounded) Read.Bounded.class.cast(pTransform)).getSource(), null);
            setState(delegatingBoundedSource);
            return Read.from(delegatingBoundedSource);
        }
        if (!Read.Unbounded.class.isInstance(pTransform)) {
            return pTransform;
        }
        UnboundedSource source = ((Read.Unbounded) Read.Unbounded.class.cast(pTransform)).getSource();
        if (InMemoryQueueIO.UnboundedQueuedInput.class.isInstance(source)) {
            return pTransform;
        }
        DelegatingUnBoundedSource delegatingUnBoundedSource = new DelegatingUnBoundedSource(source, null);
        setState(delegatingUnBoundedSource);
        return Read.from(delegatingUnBoundedSource);
    }

    private void setState(SettableSourceListener settableSourceListener) {
        Stream<PTransform<?, ?>> stream = this.transformStack.stream();
        Map<PTransform<?, ?>, JobStateAware.State> map = this.states;
        map.getClass();
        Optional<PTransform<?, ?>> findFirst = stream.filter((v1) -> {
            return r1.containsKey(v1);
        }).findFirst();
        if (!findFirst.isPresent()) {
            throw new IllegalStateException("No state for transforms " + this.transformStack);
        }
        findFirst.ifPresent(pTransform -> {
            SourceListener.Tracker tracker = new SourceListener.Tracker();
            SourceListener.TRACKERS.put(tracker.getId(), tracker);
            settableSourceListener.setSourceListener(new SourceListener.StateReleaserSourceListener(tracker.getId(), (String) Objects.requireNonNull(this.currentState, "currentState is not set"), null, null));
        });
    }

    public void registerStateForTransform(PTransform<PBegin, PCollection<Record>> pTransform, JobStateAware.State state) {
        this.states.put(pTransform, state);
    }

    public void withState(String str, Runnable runnable) {
        this.currentState = str;
        try {
            runnable.run();
        } finally {
            this.currentState = null;
        }
    }
}
