package org.apache.flink.streaming.api.operators;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import one.profiler.Events;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/MapPartitionIterator.class */
public class MapPartitionIterator<IN> implements Iterator<IN> {
    public static final int DEFAULT_MAX_CACHE_NUM = 100;
    private final Lock lock = new ReentrantLock();

    @GuardedBy(Events.LOCK)
    private final Queue<IN> cacheQueue = new LinkedList();
    private final Condition cacheNotEmpty = this.lock.newCondition();
    private final Condition cacheNotFull = this.lock.newCondition();
    private final Condition udfFinish = this.lock.newCondition();

    @GuardedBy(Events.LOCK)
    private boolean udfFinished = false;

    @GuardedBy(Events.LOCK)
    private boolean closed = false;
    private final ExecutorService udfExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("TaskUDFExecutor"));

    public MapPartitionIterator(Consumer<Iterator<IN>> consumer) {
        this.udfExecutor.execute(() -> {
            consumer.accept(this);
            runWithLock(() -> {
                this.udfFinished = true;
                this.udfFinish.signalAll();
                this.cacheNotFull.signalAll();
            });
        });
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        return ((Boolean) supplyWithLock(() -> {
            if (this.cacheQueue.size() > 0) {
                return true;
            }
            if (this.closed) {
                return false;
            }
            waitCacheNotEmpty();
            return Boolean.valueOf(hasNext());
        })).booleanValue();
    }

    @Override // java.util.Iterator
    public IN next() {
        return (IN) supplyWithLock(() -> {
            if (this.cacheQueue.size() > 0) {
                if (!this.closed && this.cacheQueue.size() == 100) {
                    this.cacheNotFull.signalAll();
                }
                return this.cacheQueue.poll();
            }
            if (this.closed) {
                return null;
            }
            waitCacheNotEmpty();
            return this.cacheQueue.poll();
        });
    }

    public void addRecord(IN in) {
        runWithLock(() -> {
            Preconditions.checkState(!this.closed);
            if (this.udfFinished) {
                return;
            }
            if (this.cacheQueue.size() >= 100) {
                waitCacheNotFull();
                addRecord(in);
            } else {
                this.cacheQueue.add(in);
                if (this.cacheQueue.size() == 1) {
                    this.cacheNotEmpty.signalAll();
                }
            }
        });
    }

    public void close() {
        runWithLock(() -> {
            this.closed = true;
            if (!this.udfFinished) {
                this.cacheNotEmpty.signalAll();
                waitUDFFinished();
            }
            this.udfExecutor.shutdown();
        });
    }

    private void waitCacheNotEmpty() {
        try {
            this.cacheNotEmpty.await();
        } catch (InterruptedException e) {
            ExceptionUtils.rethrow(e);
        }
    }

    private void waitCacheNotFull() {
        try {
            this.cacheNotFull.await();
        } catch (InterruptedException e) {
            ExceptionUtils.rethrow(e);
        }
    }

    private void waitUDFFinished() {
        try {
            this.udfFinish.await();
        } catch (InterruptedException e) {
            ExceptionUtils.rethrow(e);
        }
    }

    private void runWithLock(Runnable runnable) {
        try {
            this.lock.lock();
            runnable.run();
        } finally {
            this.lock.unlock();
        }
    }

    private <ANY> ANY supplyWithLock(Supplier<ANY> supplier) {
        try {
            this.lock.lock();
            return supplier.get();
        } finally {
            this.lock.unlock();
        }
    }
}
