package org.talend.sdk.component.runtime.beam.spi;

import java.io.ObjectStreamException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.source.ProducerFinder;
import org.talend.sdk.component.runtime.base.Delegated;
import org.talend.sdk.component.runtime.input.Mapper;
import org.talend.sdk.component.runtime.manager.service.ProducerFinderImpl;
import org.talend.sdk.component.runtime.serialization.SerializableService;

/* loaded from: input_file:org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.class */
public class BeamProducerFinder extends ProducerFinderImpl {
    private static final int QUEUE_SIZE = 200;
    private static final int BEAM_PARALLELISM = 10;
    private static final Logger log = LoggerFactory.getLogger(BeamProducerFinder.class);
    private static final Map<UUID, Queue<Record>> QUEUE = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder$MyDoFn.class */
    public static class MyDoFn extends DoFn<Record, Void> {
        private final UUID queueId;

        public MyDoFn(UUID uuid) {
            this.queueId = uuid;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Record, Void>.ProcessContext processContext) {
            Queue queue = (Queue) BeamProducerFinder.QUEUE.get(this.queueId);
            boolean offer = queue.offer((Record) processContext.element());
            BeamProducerFinder.log.debug("queue injected {}; ok={}; thread:{}", new Object[]{Integer.valueOf(queue.size()), Boolean.valueOf(offer), Long.valueOf(Thread.currentThread().getId())});
            while (!offer) {
                sleep();
                offer = queue.offer((Record) processContext.element());
                BeamProducerFinder.log.debug("\tqueue injected retry {}; ok={}; thread:{}", new Object[]{Integer.valueOf(queue.size()), Boolean.valueOf(offer), Long.valueOf(Thread.currentThread().getId())});
            }
        }

        private void sleep() {
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /* loaded from: input_file:org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder$QueueInput.class */
    static class QueueInput implements Iterator<Record>, Serializable {
        private final PTransform<PBegin, PCollection<Record>> transform;
        private final PipelineResult result = runDataReadingPipeline();
        private boolean started;
        private boolean end;
        private Record next;
        private final UUID queueId;
        private Thread th;

        public QueueInput(Object obj, String str, String str2, String str3, PTransform<PBegin, PCollection<Record>> pTransform, UUID uuid) {
            this.transform = pTransform;
            this.queueId = uuid;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.next == null && !this.started) {
                this.next = findNext();
                this.started = true;
            }
            if (this.next == null) {
                BeamProducerFinder.QUEUE.remove(this.queueId);
            }
            return this.next != null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Record next() {
            if (!hasNext()) {
                return null;
            }
            Record record = this.next;
            this.next = findNext();
            return record;
        }

        private Record findNext() {
            Queue queue = (Queue) BeamProducerFinder.QUEUE.get(this.queueId);
            Record record = (Record) queue.poll();
            int i = 0;
            while (record == null && !this.end) {
                this.end = (this.result == null || this.result.getState() == PipelineResult.State.RUNNING) ? false : true;
                if (this.end || i <= BeamProducerFinder.BEAM_PARALLELISM) {
                    i++;
                    BeamProducerFinder.log.debug("findNext NULL, retry : end={}; size:{}", Boolean.valueOf(this.end), Integer.valueOf(queue.size()));
                    sleep();
                } else {
                    this.result.waitUntilFinish();
                }
                record = (Record) queue.poll();
            }
            return record;
        }

        private PipelineResult runDataReadingPipeline() {
            ClassLoader classLoader = Pipeline.class.getClassLoader();
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(classLoader);
                DirectOptions as = PipelineOptionsFactory.as(DirectOptions.class);
                as.setRunner(DirectRunner.class);
                as.setTargetParallelism(BeamProducerFinder.BEAM_PARALLELISM);
                as.setBlockOnRun(false);
                ParDo.SingleOutput of = ParDo.of(new MyDoFn(this.queueId));
                Pipeline create = Pipeline.create(as);
                create.apply(this.transform).apply(of);
                PipelineResult[] pipelineResultArr = new PipelineResult[1];
                this.th = new Thread(() -> {
                    pipelineResultArr[0] = create.run();
                });
                this.th.start();
                while (pipelineResultArr[0] == null) {
                    sleep();
                }
                PipelineResult pipelineResult = pipelineResultArr[0];
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                return pipelineResult;
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        }

        private void sleep() {
            try {
                Thread.sleep(30L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public Iterator<Record> find(String str, String str2, int i, Map<String, String> map) {
        Mapper findMapper = findMapper(getInstantiator(str, str2), i, map);
        try {
            return iterator(findMapper.create());
        } catch (Exception e) {
            log.warn("Component Kit Mapper instantiation failed, trying to wrap native beam mapper...");
            Object delegate = ((Delegated) Delegated.class.cast(findMapper)).getDelegate();
            if (!PTransform.class.isInstance(delegate)) {
                throw new IllegalStateException(e);
            }
            UUID randomUUID = UUID.randomUUID();
            QUEUE.put(randomUUID, new ArrayBlockingQueue(QUEUE_SIZE, true));
            return new QueueInput(delegate, str, str2, str, (PTransform) PTransform.class.cast(delegate), randomUUID);
        }
    }

    Object writeReplace() throws ObjectStreamException {
        return new SerializableService(this.plugin, ProducerFinder.class.getName());
    }
}
