/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.google.pubsub;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.base.Strings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
import org.apache.camel.component.google.pubsub.consumer.AcknowledgeCompletion;
import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync;
import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GooglePubsubConsumer
extends DefaultConsumer {
    private final Logger localLog;
    private final GooglePubsubEndpoint endpoint;
    private final Processor processor;
    private ExecutorService executor;
    private final List<Subscriber> subscribers;
    private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses;

    GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) {
        super(endpoint, processor);
        this.endpoint = endpoint;
        this.processor = processor;
        this.subscribers = Collections.synchronizedList(new LinkedList());
        this.pendingSynchronousPullResponses = Collections.synchronizedSet(new HashSet());
        String loggerId = endpoint.getLoggerId();
        if (Strings.isNullOrEmpty(loggerId)) {
            loggerId = this.getClass().getName();
        }
        this.localLog = LoggerFactory.getLogger((String)loggerId);
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        this.localLog.info("Starting Google PubSub consumer for {}/{}", (Object)this.endpoint.getProjectId(), (Object)this.endpoint.getDestinationName());
        this.executor = this.endpoint.createExecutor();
        for (int i = 0; i < this.endpoint.getConcurrentConsumers(); ++i) {
            this.executor.submit(new SubscriberWrapper());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doStop() throws Exception {
        super.doStop();
        this.localLog.info("Stopping Google PubSub consumer for {}/{}", (Object)this.endpoint.getProjectId(), (Object)this.endpoint.getDestinationName());
        List<Subscriber> list = this.subscribers;
        synchronized (list) {
            if (!this.subscribers.isEmpty()) {
                this.localLog.info("Stopping subscribers for {}/{}", (Object)this.endpoint.getProjectId(), (Object)this.endpoint.getDestinationName());
                this.subscribers.forEach(AbstractApiService::stopAsync);
            }
        }
        this.safeCancelSynchronousPullResponses();
        if (this.executor != null) {
            if (this.getEndpoint() != null && this.getEndpoint().getCamelContext() != null) {
                this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor);
            } else {
                this.executor.shutdownNow();
            }
        }
        this.executor = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void safeCancelSynchronousPullResponses() {
        Set<ApiFuture<PullResponse>> set = this.pendingSynchronousPullResponses;
        synchronized (set) {
            for (ApiFuture<PullResponse> pullResponseApiFuture : this.pendingSynchronousPullResponses) {
                try {
                    pullResponseApiFuture.cancel(true);
                }
                catch (Exception e) {
                    this.localLog.warn("Exception while cancelling pending synchronous pull response", (Throwable)e);
                }
            }
            this.pendingSynchronousPullResponses.clear();
        }
    }

    private class SubscriberWrapper
    implements Runnable {
        private final String subscriptionName;

        SubscriberWrapper() {
            this.subscriptionName = ProjectSubscriptionName.format(GooglePubsubConsumer.this.endpoint.getProjectId(), GooglePubsubConsumer.this.endpoint.getDestinationName());
        }

        @Override
        public void run() {
            try {
                if (GooglePubsubConsumer.this.localLog.isDebugEnabled()) {
                    GooglePubsubConsumer.this.localLog.debug("Subscribing to {}", (Object)this.subscriptionName);
                }
                if (GooglePubsubConsumer.this.endpoint.isSynchronousPull()) {
                    this.synchronousPull(this.subscriptionName);
                } else {
                    this.asynchronousPull(this.subscriptionName);
                }
                GooglePubsubConsumer.this.localLog.debug("Exit run for subscription {}", (Object)this.subscriptionName);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                GooglePubsubConsumer.this.localLog.error("Failure getting messages from PubSub", (Throwable)e);
            }
            catch (Exception e) {
                GooglePubsubConsumer.this.localLog.error("Failure getting messages from PubSub", (Throwable)e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void asynchronousPull(String subscriptionName) throws IOException {
            while (GooglePubsubConsumer.this.isRunAllowed() && !GooglePubsubConsumer.this.isSuspendingOrSuspended()) {
                CamelMessageReceiver messageReceiver = new CamelMessageReceiver(GooglePubsubConsumer.this, GooglePubsubConsumer.this.endpoint, GooglePubsubConsumer.this.processor);
                Subscriber subscriber = GooglePubsubConsumer.this.endpoint.getComponent().getSubscriber(subscriptionName, messageReceiver, GooglePubsubConsumer.this.endpoint);
                try {
                    GooglePubsubConsumer.this.subscribers.add(subscriber);
                    subscriber.startAsync().awaitRunning();
                    subscriber.awaitTerminated();
                }
                catch (Exception e) {
                    GooglePubsubConsumer.this.localLog.error("Failure getting messages from PubSub", (Throwable)e);
                }
                finally {
                    GooglePubsubConsumer.this.localLog.debug("Stopping async subscriber {}", (Object)subscriptionName);
                    subscriber.stopAsync();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void synchronousPull(String subscriptionName) throws ExecutionException, InterruptedException {
            while (GooglePubsubConsumer.this.isRunAllowed() && !GooglePubsubConsumer.this.isSuspendingOrSuspended()) {
                ApiFuture<PullResponse> synchronousPullResponseFuture = null;
                try {
                    SubscriberStub subscriber = GooglePubsubConsumer.this.endpoint.getComponent().getSubscriberStub(GooglePubsubConsumer.this.endpoint);
                    try {
                        PullRequest pullRequest = PullRequest.newBuilder().setMaxMessages(GooglePubsubConsumer.this.endpoint.getMaxMessagesPerPoll()).setReturnImmediately(false).setSubscription(subscriptionName).build();
                        synchronousPullResponseFuture = subscriber.pullCallable().futureCall(pullRequest);
                        GooglePubsubConsumer.this.pendingSynchronousPullResponses.add(synchronousPullResponseFuture);
                        PullResponse pullResponse = (PullResponse)synchronousPullResponseFuture.get();
                        for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
                            PubsubMessage pubsubMessage = message.getMessage();
                            Exchange exchange = GooglePubsubConsumer.this.createExchange(true);
                            exchange.getIn().setBody(pubsubMessage.getData().toByteArray());
                            exchange.getIn().setHeader("CamelGooglePubsubMsgAckId", message.getAckId());
                            exchange.getIn().setHeader("CamelGooglePubsubMessageId", pubsubMessage.getMessageId());
                            exchange.getIn().setHeader("CamelGooglePubsubPublishTime", pubsubMessage.getPublishTime());
                            exchange.getIn().setHeader("CamelGooglePubsubAttributes", pubsubMessage.getAttributesMap());
                            AcknowledgeSync acknowledge = new AcknowledgeSync(() -> GooglePubsubConsumer.this.endpoint.getComponent().getSubscriberStub(GooglePubsubConsumer.this.endpoint), subscriptionName);
                            if (GooglePubsubConsumer.this.endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
                                exchange.getExchangeExtension().addOnCompletion(new AcknowledgeCompletion(acknowledge));
                            } else {
                                exchange.getIn().setHeader("CamelGooglePubsubAcknowledge", acknowledge);
                            }
                            try {
                                GooglePubsubConsumer.this.processor.process(exchange);
                            }
                            catch (Exception e) {
                                GooglePubsubConsumer.this.getExceptionHandler().handleException(e);
                            }
                        }
                    }
                    finally {
                        if (subscriber == null) continue;
                        subscriber.close();
                    }
                }
                catch (CancellationException e) {
                    GooglePubsubConsumer.this.localLog.debug("PubSub synchronous pull request cancelled", (Throwable)e);
                }
                catch (IOException e) {
                    GooglePubsubConsumer.this.localLog.error("I/O exception while getting messages from PubSub. Reconnecting.", (Throwable)e);
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof ApiException && ((ApiException)e.getCause()).isRetryable()) {
                        GooglePubsubConsumer.this.localLog.error("Retryable API exception in getting messages from PubSub", e.getCause());
                        continue;
                    }
                    throw e;
                }
                finally {
                    if (synchronousPullResponseFuture == null) continue;
                    GooglePubsubConsumer.this.pendingSynchronousPullResponses.remove(synchronousPullResponseFuture);
                }
            }
        }
    }
}

