package com.amazon.ws.emr.hadoop.fs.concurrent;

import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.base.Optional;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/* loaded from: input_file:com/amazon/ws/emr/hadoop/fs/concurrent/ProducerConsumerExecutor.class */
public class ProducerConsumerExecutor {
    public synchronized <T> void execute(Collection<Producer<T>> collection, Collection<Consumer<T>> collection2) {
        doExecute(collection, collection2);
    }

    private <T> void doExecute(Collection<Producer<T>> collection, Collection<Consumer<T>> collection2) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(collection.size() + collection2.size(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(String.format("Executor:%d:thread:%%d", Integer.valueOf(System.identityHashCode(this)))).build());
        try {
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1000);
            ArrayList arrayList = new ArrayList();
            Iterator<Producer<T>> it = collection.iterator();
            while (it.hasNext()) {
                arrayList.add(newFixedThreadPool.submit(new BlockingQueueProducer(arrayBlockingQueue, it.next())));
            }
            ArrayList arrayList2 = new ArrayList();
            Iterator<Consumer<T>> it2 = collection2.iterator();
            while (it2.hasNext()) {
                arrayList2.add(newFixedThreadPool.submit(new BlockingQueueConsumer(arrayBlockingQueue, it2.next())));
            }
            waitForExecutionToFinish(arrayBlockingQueue, arrayList, arrayList2);
            newFixedThreadPool.shutdownNow();
        } catch (Throwable th) {
            newFixedThreadPool.shutdownNow();
            throw th;
        }
    }

    private void waitForExecutionToFinish(BlockingQueue blockingQueue, List<Future> list, List<Future> list2) {
        try {
            Iterator<Future> it = list.iterator();
            while (it.hasNext()) {
                it.next().get();
            }
            for (Future future : list2) {
                blockingQueue.put(Optional.absent());
            }
            Iterator<Future> it2 = list2.iterator();
            while (it2.hasNext()) {
                it2.next().get();
            }
        } catch (InterruptedException | ExecutionException e) {
            cancelAll(list);
            cancelAll(list2);
            throw new RuntimeException(e);
        }
    }

    private void cancelAll(Collection<Future> collection) {
        Iterator<Future> it = collection.iterator();
        while (it.hasNext()) {
            it.next().cancel(true);
        }
    }
}
