package org.talend.sdk.component.runtime.beam.spi;

import java.io.ObjectStreamException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.source.ProducerFinder;
import org.talend.sdk.component.runtime.base.Delegated;
import org.talend.sdk.component.runtime.base.LifecycleImpl;
import org.talend.sdk.component.runtime.input.Input;
import org.talend.sdk.component.runtime.input.Mapper;
import org.talend.sdk.component.runtime.manager.service.ProducerFinderImpl;
import org.talend.sdk.component.runtime.serialization.SerializableService;

/* loaded from: input_file:org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.class */
public class BeamProducerFinder extends ProducerFinderImpl {
    private static final Logger log = LoggerFactory.getLogger(BeamProducerFinder.class);
    static final int CAPACITY = Integer.parseInt(System.getProperty("talend.beam.wrapper.capacity", "100000"));
    static final Queue<Record> QUEUE = new ArrayBlockingQueue(CAPACITY, true);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder$PushRecord.class */
    public static class PushRecord extends DoFn<Record, Void> implements Serializable {
        PushRecord() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element Record record) {
            boolean offer = BeamProducerFinder.QUEUE.offer(record);
            while (!offer) {
                if (BeamProducerFinder.QUEUE.size() >= BeamProducerFinder.CAPACITY) {
                    String format = String.format("Wrapper queue if full (capacity: %d). Consider increasing it according data with talend.beam.wrapper.capacity property.", Integer.valueOf(BeamProducerFinder.CAPACITY));
                    BeamProducerFinder.log.error("[processElement] {}", format);
                    throw new IllegalStateException(format);
                }
                sleep();
                offer = BeamProducerFinder.QUEUE.offer(record);
            }
        }

        private void sleep() {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /* loaded from: input_file:org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder$QueueInput.class */
    static class QueueInput extends LifecycleImpl implements Input, Iterator<Record> {
        private final PTransform<PBegin, PCollection<Record>> transform;
        private final PipelineResult result;
        private boolean started;
        private boolean end;
        private Record next;

        public QueueInput(Object obj, String str, String str2, String str3, PTransform<PBegin, PCollection<Record>> pTransform) {
            super(obj, str, str2, str3);
            this.transform = pTransform;
            this.result = runDataReadingPipeline();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.next == null && !this.started) {
                this.next = findNext();
                this.started = true;
            }
            return this.next != null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Record next() {
            if (!hasNext()) {
                return null;
            }
            Record record = this.next;
            this.next = findNext();
            return record;
        }

        private Record findNext() {
            Record record;
            Record poll = BeamProducerFinder.QUEUE.poll();
            while (true) {
                record = poll;
                if (record != null || this.end) {
                    break;
                }
                this.end = this.result.getState() != PipelineResult.State.RUNNING;
                sleep();
                poll = BeamProducerFinder.QUEUE.poll();
            }
            return record;
        }

        private PipelineResult runDataReadingPipeline() {
            ClassLoader classLoader = Pipeline.class.getClassLoader();
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(classLoader);
                PipelineOptions create = PipelineOptionsFactory.create();
                ParDo.SingleOutput of = ParDo.of(new PushRecord());
                Pipeline create2 = Pipeline.create(create);
                create2.apply(this.transform).apply(of);
                PipelineResult run = create2.run();
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                return run;
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        }

        private void sleep() {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public Iterator<Record> find(String str, String str2, int i, Map<String, String> map) {
        Mapper findMapper = findMapper(getInstantiator(str, str2), i, map);
        try {
            return iterator(findMapper.create());
        } catch (Exception e) {
            log.warn("Component Kit Mapper instantiation failed, trying to wrap native beam mapper...");
            Object delegate = ((Delegated) Delegated.class.cast(findMapper)).getDelegate();
            if (PTransform.class.isInstance(delegate)) {
                return new QueueInput(delegate, str, str2, str, (PTransform) PTransform.class.cast(delegate));
            }
            throw new IllegalStateException(e);
        }
    }

    Object writeReplace() throws ObjectStreamException {
        return new SerializableService(this.plugin, ProducerFinder.class.getName());
    }
}
