package org.talend.sdk.component.runtime.beam.chain.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.talend.sdk.component.runtime.beam.TalendFn;
import org.talend.sdk.component.runtime.beam.TalendIO;
import org.talend.sdk.component.runtime.beam.coder.registry.SchemaRegistryCoder;
import org.talend.sdk.component.runtime.beam.transform.AutoKVWrapper;
import org.talend.sdk.component.runtime.beam.transform.CoGroupByKeyResultMappingTransform;
import org.talend.sdk.component.runtime.beam.transform.RecordBranchFilter;
import org.talend.sdk.component.runtime.beam.transform.RecordBranchMapper;
import org.talend.sdk.component.runtime.beam.transform.RecordBranchUnwrapper;
import org.talend.sdk.component.runtime.beam.transform.RecordKVUnwrapper;
import org.talend.sdk.component.runtime.beam.transform.RecordNormalizer;
import org.talend.sdk.component.runtime.input.Mapper;
import org.talend.sdk.component.runtime.manager.chain.Job;
import org.talend.sdk.component.runtime.manager.chain.internal.JobImpl;
import org.talend.sdk.component.runtime.output.Processor;

/* loaded from: input_file:org/talend/sdk/component/runtime/beam/chain/impl/BeamExecutor.class */
public class BeamExecutor implements Job.ExecutorBuilder {
    private final JobImpl.JobExecutor delegate;

    public Job.ExecutorBuilder property(String str, Object obj) {
        this.delegate.property(str, obj);
        return this;
    }

    public void run() {
        try {
            Map map = (Map) this.delegate.getLevels().values().stream().flatMap((v0) -> {
                return v0.stream();
            }).filter((v0) -> {
                return v0.isSource();
            }).collect(Collectors.toMap((v0) -> {
                return v0.getId();
            }, component -> {
                return (Mapper) this.delegate.getManager().findMapper(component.getNode().getFamily(), component.getNode().getComponent(), component.getNode().getVersion(), component.getNode().getConfiguration()).orElseThrow(() -> {
                    return new IllegalStateException("No mapper found for: " + component.getNode());
                });
            }));
            Map map2 = (Map) this.delegate.getLevels().values().stream().flatMap((v0) -> {
                return v0.stream();
            }).filter(component2 -> {
                return !component2.isSource();
            }).collect(Collectors.toMap((v0) -> {
                return v0.getId();
            }, component3 -> {
                return (Processor) this.delegate.getManager().findProcessor(component3.getNode().getFamily(), component3.getNode().getComponent(), component3.getNode().getVersion(), component3.getNode().getConfiguration()).orElseThrow(() -> {
                    return new IllegalStateException("No processor found for:" + component3.getNode());
                });
            }));
            Pipeline create = Pipeline.create(createPipelineOptions());
            HashMap hashMap = new HashMap();
            this.delegate.getLevels().values().stream().flatMap((v0) -> {
                return v0.stream();
            }).forEach(component4 -> {
                PCollection apply;
                if (component4.isSource()) {
                    Mapper mapper = (Mapper) map.get(component4.getId());
                    HashMap hashMap2 = new HashMap();
                    if (mapper.isStream()) {
                        hashMap2.put("maxRecords", String.valueOf(this.delegate.getJobProperties().getOrDefault("streaming.maxRecords", "1000")));
                        hashMap2.put("maxDurationMs", String.valueOf(this.delegate.getJobProperties().getOrDefault("streaming.maxDurationMs", "60000")));
                    }
                    hashMap.put(component4.getId(), create.apply(toName("TalendIO", component4), TalendIO.read(mapper, hashMap2)).apply(toName("RecordNormalizer", component4), RecordNormalizer.of(mapper.plugin())));
                    return;
                }
                Processor processor = (Processor) map2.get(component4.getId());
                Map map3 = (Map) getEdges(this.delegate.getEdges(), component4, edge -> {
                    return edge.getTo().getNode();
                }).stream().collect(Collectors.toMap(edge2 -> {
                    return edge2.getTo().getBranch();
                }, edge3 -> {
                    PCollection apply2 = ((PCollection) hashMap.get(edge3.getFrom().getNode().getId())).apply(toName("RecordBranchFilter", component4, edge3), RecordBranchFilter.of(processor.plugin(), edge3.getFrom().getBranch()));
                    return (edge3.getFrom().getBranch().equals(edge3.getTo().getBranch()) ? apply2 : (PCollection) apply2.apply(toName("RecordBranchMapper", component4, edge3), RecordBranchMapper.of(processor.plugin(), edge3.getFrom().getBranch(), edge3.getTo().getBranch()))).apply(toName("RecordBranchUnwrapper", component4, edge3), RecordBranchUnwrapper.of(processor.plugin(), edge3.getTo().getBranch())).apply(toName("AutoKVWrapper", component4, edge3), AutoKVWrapper.of(processor.plugin(), this.delegate.getKeyProvider(component4.getId()), component4.getId(), edge3.getFrom().getBranch()));
                }));
                if (map3.size() == 1) {
                    apply = (PCollection) ((PCollection) ((Map.Entry) map3.entrySet().iterator().next()).getValue()).apply(toName("RecordKVUnwrapper", component4), ParDo.of(new RecordKVUnwrapper())).setCoder(SchemaRegistryCoder.of()).apply(toName("RecordNormalizer", component4), RecordNormalizer.of(processor.plugin()));
                } else {
                    KeyedPCollectionTuple keyedPCollectionTuple = null;
                    for (Map.Entry entry : map3.entrySet()) {
                        TupleTag tupleTag = new TupleTag((String) entry.getKey());
                        keyedPCollectionTuple = keyedPCollectionTuple == null ? KeyedPCollectionTuple.of(tupleTag, (PCollection) entry.getValue()) : keyedPCollectionTuple.and(tupleTag, (PCollection) entry.getValue());
                    }
                    apply = keyedPCollectionTuple.apply(toName("CoGroupByKey", component4), CoGroupByKey.create()).apply(toName("CoGroupByKeyResultMappingTransform", component4), new CoGroupByKeyResultMappingTransform(processor.plugin(), true));
                }
                if (getEdges(this.delegate.getEdges(), component4, edge4 -> {
                    return edge4.getFrom().getNode();
                }).isEmpty()) {
                    apply.apply(toName("Output", component4), TalendIO.write(processor));
                } else {
                    hashMap.put(component4.getId(), apply.apply(toName("Processor", component4), TalendFn.asFn(processor)));
                }
            });
            PipelineResult run = create.run();
            run.waitUntilFinish();
            while (PipelineResult.State.RUNNING.equals(run.getState())) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    throw new IllegalStateException("the job was aborted", e);
                }
            }
        } finally {
            this.delegate.getLevels().values().stream().flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0.getId();
            }).forEach(JobImpl.LocalSequenceHolder::clean);
        }
    }

    private String toName(String str, Job.Component component, Job.Edge edge) {
        return String.format(str + "/step=%s,from=%s(%s)-to=%s(%s)", component.getId(), edge.getFrom().getNode().getId(), edge.getFrom().getBranch(), edge.getTo().getNode().getId(), edge.getTo().getBranch());
    }

    private String toName(String str, Job.Component component) {
        return String.format(str + "/%s", component.getId());
    }

    private List<Job.Edge> getEdges(List<Job.Edge> list, Job.Component component, Function<Job.Edge, Job.Component> function) {
        return (List) list.stream().filter(edge -> {
            return ((Job.Component) function.apply(edge)).equals(component);
        }).collect(Collectors.toList());
    }

    private PipelineOptions createPipelineOptions() {
        return PipelineOptionsFactory.fromArgs((String[]) System.getProperties().stringPropertyNames().stream().filter(str -> {
            return str.startsWith("talend.beam.job.");
        }).map(str2 -> {
            return "--" + str2.substring("talend.beam.job.".length()) + "=" + System.getProperty(str2);
        }).toArray(i -> {
            return new String[i];
        })).create();
    }

    public BeamExecutor(JobImpl.JobExecutor jobExecutor) {
        this.delegate = jobExecutor;
    }
}
