package org.apache.hadoop.hive.llap;

import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hive.llap.AsyncPbRpcProxy;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncResponseHandler.class */
public class AsyncResponseHandler {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncResponseHandler.class);
    private final AsyncPbRpcProxy.RequestManager requestManager;
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final ExecutorService responseWaitingService = Executors.newSingleThreadExecutor();
    private final LinkedBlockingDeque<AsyncPbRpcProxy.AsyncCallableRequest<Message, Message>> incomingResponseFutures = new LinkedBlockingDeque<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncResponseHandler$AsyncResponseHandlerRunnable.class */
    public final class AsyncResponseHandlerRunnable implements Runnable {
        private final List<AsyncPbRpcProxy.AsyncCallableRequest<Message, Message>> responseFuturesQueue;

        private AsyncResponseHandlerRunnable() {
            this.responseFuturesQueue = new ArrayList();
        }

        @Override // java.lang.Runnable
        public void run() {
            loop0: while (!AsyncResponseHandler.this.isShutdown.get()) {
                try {
                    if (this.responseFuturesQueue.isEmpty()) {
                        this.responseFuturesQueue.add((AsyncPbRpcProxy.AsyncCallableRequest) AsyncResponseHandler.this.incomingResponseFutures.take());
                    }
                } catch (InterruptedException e) {
                    AsyncResponseHandler.LOG.warn("Async response handler was interrupted", e);
                }
                Iterator<AsyncPbRpcProxy.AsyncCallableRequest<Message, Message>> it = this.responseFuturesQueue.iterator();
                while (it.hasNext()) {
                    AsyncPbRpcProxy.AsyncCallableRequest<Message, Message> next = it.next();
                    AsyncGet<Message, Exception> responseFuture = next.getResponseFuture();
                    if (responseFuture != null && responseFuture.isDone()) {
                        try {
                            it.remove();
                            LlapNodeId nodeId = next.getNodeId();
                            try {
                                try {
                                    Throwable th = (Message) responseFuture.get(-1L, TimeUnit.MILLISECONDS);
                                    if (th instanceof Throwable) {
                                        next.getCallback().indicateError(th);
                                    } else {
                                        next.getCallback().setResponse(th);
                                    }
                                    AsyncResponseHandler.this.requestManager.requestFinished(nodeId);
                                } catch (Exception e2) {
                                    next.getCallback().indicateError(e2);
                                    AsyncResponseHandler.this.requestManager.requestFinished(nodeId);
                                }
                            } catch (Throwable th2) {
                                AsyncResponseHandler.this.requestManager.requestFinished(nodeId);
                                throw th2;
                                break loop0;
                            }
                        } catch (Throwable th3) {
                            AsyncResponseHandler.LOG.warn("ResponseDispatcher caught", th3);
                        }
                    }
                }
                while (!AsyncResponseHandler.this.incomingResponseFutures.isEmpty()) {
                    this.responseFuturesQueue.add((AsyncPbRpcProxy.AsyncCallableRequest) AsyncResponseHandler.this.incomingResponseFutures.poll());
                }
            }
            AsyncResponseHandler.LOG.info("Async response handler exiting");
        }
    }

    public AsyncResponseHandler(AsyncPbRpcProxy.RequestManager requestManager) {
        this.requestManager = requestManager;
    }

    public void start() {
        this.responseWaitingService.submit(new AsyncResponseHandlerRunnable());
    }

    public void addToAsyncResponseFutureQueue(AsyncPbRpcProxy.AsyncCallableRequest<Message, Message> asyncCallableRequest) {
        this.incomingResponseFutures.add(asyncCallableRequest);
    }

    public void shutdownNow() {
        this.isShutdown.set(true);
        this.responseWaitingService.shutdownNow();
    }
}
