/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsub.v1.AckRequestData;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageDispatcher;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.ModackRequestData;
import com.google.cloud.pubsub.v1.StatusUtil;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.Waiter;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.rpc.ErrorInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

final class StreamingSubscriberConnection
extends AbstractApiService
implements MessageDispatcher.AckProcessor {
    private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName());
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100L);
    private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10L);
    private static final long INITIAL_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS = 100L;
    private static final long MAX_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS = Duration.ofSeconds(10L).toMillis();
    private static final int MAX_PER_REQUEST_CHANGES = 1000;
    private final String PERMANENT_FAILURE_INVALID_ACK_ID_METADATA = "PERMANENT_FAILURE_INVALID_ACK_ID";
    private final String TRANSIENT_FAILURE_METADATA_PREFIX = "TRANSIENT_";
    private Duration inititalStreamAckDeadline;
    private final Map<String, List<String>> streamMetadata;
    private final SubscriberStub subscriberStub;
    private final int channelAffinity;
    private final String subscription;
    private final ScheduledExecutorService systemExecutor;
    private final MessageDispatcher messageDispatcher;
    private final FlowControlSettings flowControlSettings;
    private final boolean useLegacyFlowControl;
    private final Set<AckRequestData> pendingRequests = ConcurrentHashMap.newKeySet();
    private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
    private final Waiter ackOperationsWaiter = new Waiter();
    private final Lock lock = new ReentrantLock();
    private ClientStream<StreamingPullRequest> clientStream;
    private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false);
    private final String clientId = UUID.randomUUID().toString();

    private StreamingSubscriberConnection(Builder builder) {
        this.subscription = builder.subscription;
        this.systemExecutor = builder.systemExecutor;
        this.inititalStreamAckDeadline = builder.maxDurationPerAckExtensionDefaultUsed ? Subscriber.STREAM_ACK_DEADLINE_DEFAULT : (builder.maxDurationPerAckExtension.compareTo(Subscriber.MIN_STREAM_ACK_DEADLINE) < 0 ? Subscriber.MIN_STREAM_ACK_DEADLINE : (builder.maxDurationPerAckExtension.compareTo(Subscriber.MAX_STREAM_ACK_DEADLINE) > 0 ? Subscriber.MAX_STREAM_ACK_DEADLINE : builder.maxDurationPerAckExtension));
        this.streamMetadata = ImmutableMap.of("x-goog-request-params", ImmutableList.of("subscription=" + this.subscription));
        this.subscriberStub = builder.subscriberStub;
        this.channelAffinity = builder.channelAffinity;
        MessageDispatcher.Builder messageDispatcherBuilder = builder.receiver != null ? MessageDispatcher.newBuilder(builder.receiver) : MessageDispatcher.newBuilder(builder.receiverWithAckResponse);
        this.messageDispatcher = messageDispatcherBuilder.setAckProcessor(this).setAckExpirationPadding(builder.ackExpirationPadding).setMaxAckExtensionPeriod(builder.maxAckExtensionPeriod).setMinDurationPerAckExtension(builder.minDurationPerAckExtension).setMinDurationPerAckExtensionDefaultUsed(builder.minDurationPerAckExtensionDefaultUsed).setMaxDurationPerAckExtension(builder.maxDurationPerAckExtension).setMaxDurationPerAckExtensionDefaultUsed(builder.maxDurationPerAckExtensionDefaultUsed).setAckLatencyDistribution(builder.ackLatencyDistribution).setFlowController(builder.flowController).setExecutor(builder.executor).setSystemExecutor(builder.systemExecutor).setApiClock(builder.clock).build();
        this.flowControlSettings = builder.flowControlSettings;
        this.useLegacyFlowControl = builder.useLegacyFlowControl;
    }

    public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled(boolean isExactlyOnceDeliveryEnabled) {
        this.exactlyOnceDeliveryEnabled.set(isExactlyOnceDeliveryEnabled);
        return this;
    }

    public boolean getExactlyOnceDeliveryEnabled() {
        return this.exactlyOnceDeliveryEnabled.get();
    }

    @Override
    protected void doStart() {
        logger.config("Starting subscriber.");
        this.messageDispatcher.start();
        this.initialize();
        this.notifyStarted();
    }

    @Override
    protected void doStop() {
        this.lock.lock();
        try {
            this.clientStream.closeSendWithError(Status.CANCELLED.asException());
        }
        finally {
            this.lock.unlock();
        }
        this.runShutdown();
        this.notifyStopped();
    }

    private void runShutdown() {
        this.messageDispatcher.stop();
        this.ackOperationsWaiter.waitComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initialize() {
        SettableApiFuture<Void> errorFuture = SettableApiFuture.create();
        StreamingPullResponseObserver responseObserver = new StreamingPullResponseObserver(errorFuture);
        ClientStream<StreamingPullRequest> initClientStream = this.subscriberStub.streamingPullCallable().splitCall(responseObserver, GrpcCallContext.createDefault().withChannelAffinity(this.channelAffinity).withExtraHeaders((Map)this.streamMetadata));
        logger.log(Level.FINER, "Initializing stream to subscription {0}", this.subscription);
        initClientStream.send(StreamingPullRequest.newBuilder().setSubscription(this.subscription).setStreamAckDeadlineSeconds(Math.toIntExact(this.inititalStreamAckDeadline.getSeconds())).setClientId(this.clientId).setMaxOutstandingMessages(this.useLegacyFlowControl ? 0L : this.valueOrZero(this.flowControlSettings.getMaxOutstandingElementCount())).setMaxOutstandingBytes(this.useLegacyFlowControl ? 0L : this.valueOrZero(this.flowControlSettings.getMaxOutstandingRequestBytes())).build());
        this.lock.lock();
        try {
            this.clientStream = initClientStream;
        }
        finally {
            this.lock.unlock();
        }
        ApiFutures.addCallback(errorFuture, new ApiFutureCallback<Void>(){

            @Override
            public void onSuccess(@Nullable Void result) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    return;
                }
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.initialize();
            }

            @Override
            public void onFailure(Throwable cause) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    logger.log(Level.FINE, "pull failure after service no longer running", cause);
                    return;
                }
                if (!StatusUtil.isRetryable(cause)) {
                    ApiException gaxException = ApiExceptionFactory.createException(cause, GrpcStatusCode.of(Status.fromThrowable(cause).getCode()), false);
                    logger.log(Level.SEVERE, "terminated streaming with exception", gaxException);
                    StreamingSubscriberConnection.this.runShutdown();
                    StreamingSubscriberConnection.this.setFailureFutureOutstandingMessages(cause);
                    StreamingSubscriberConnection.this.notifyFailed(gaxException);
                    return;
                }
                logger.log(Level.FINE, "stream closed with retryable exception; will reconnect", cause);
                long backoffMillis = StreamingSubscriberConnection.this.channelReconnectBackoffMillis.get();
                long newBackoffMillis = Math.min(backoffMillis * 2L, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(newBackoffMillis);
                StreamingSubscriberConnection.this.systemExecutor.schedule(new Runnable(){

                    @Override
                    public void run() {
                        StreamingSubscriberConnection.this.initialize();
                    }
                }, backoffMillis, TimeUnit.MILLISECONDS);
            }
        }, MoreExecutors.directExecutor());
    }

    private Long valueOrZero(Long value) {
        return value != null ? value : 0L;
    }

    private boolean isAlive() {
        ApiService.State state = this.state();
        return state == ApiService.State.RUNNING || state == ApiService.State.STARTING;
    }

    public void setResponseOutstandingMessages(AckResponse ackResponse) {
        logger.log(Level.WARNING, "Setting response: {0} on outstanding messages", ackResponse.toString());
        for (AckRequestData ackRequestData : this.pendingRequests) {
            ackRequestData.setResponse(ackResponse, false);
        }
        this.pendingRequests.clear();
    }

    private void setFailureFutureOutstandingMessages(Throwable t2) {
        AckResponse ackResponse;
        if (this.getExactlyOnceDeliveryEnabled()) {
            if (!(t2 instanceof ApiException)) {
                AckResponse ackResponse2 = AckResponse.OTHER;
            }
            ApiException apiException = (ApiException)t2;
            switch (apiException.getStatusCode().getCode()) {
                case FAILED_PRECONDITION: {
                    ackResponse = AckResponse.FAILED_PRECONDITION;
                    break;
                }
                case PERMISSION_DENIED: {
                    ackResponse = AckResponse.PERMISSION_DENIED;
                    break;
                }
                default: {
                    ackResponse = AckResponse.OTHER;
                    break;
                }
            }
        } else {
            ackResponse = AckResponse.SUCCESSFUL;
        }
        this.setResponseOutstandingMessages(ackResponse);
    }

    @Override
    public void sendAckOperations(List<AckRequestData> ackRequestDataList) {
        this.sendAckOperations(ackRequestDataList, 100L);
    }

    @Override
    public void sendModackOperations(List<ModackRequestData> modackRequestDataList) {
        this.sendModackOperations(modackRequestDataList, 100L);
    }

    private void sendAckOperations(List<AckRequestData> ackRequestDataList, long currentBackoffMillis) {
        int pendingOperations = 0;
        for (List<AckRequestData> ackRequestDataInRequestList : Lists.partition(ackRequestDataList, 1000)) {
            ArrayList<String> ackIdsInRequest = new ArrayList<String>();
            for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
                ackIdsInRequest.add(ackRequestData.getAckId());
                if (!ackRequestData.hasMessageFuture()) continue;
                this.pendingRequests.add(ackRequestData);
            }
            ApiFutureCallback<Empty> callback = this.getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis);
            ApiFuture<Empty> ackFuture = this.subscriberStub.acknowledgeCallable().futureCall(AcknowledgeRequest.newBuilder().setSubscription(this.subscription).addAllAckIds(ackIdsInRequest).build());
            ApiFutures.addCallback(ackFuture, callback, MoreExecutors.directExecutor());
            ++pendingOperations;
        }
        this.ackOperationsWaiter.incrementPendingCount(pendingOperations);
    }

    private void sendModackOperations(List<ModackRequestData> modackRequestDataList, long currentBackoffMillis) {
        int pendingOperations = 0;
        for (ModackRequestData modackRequestData : modackRequestDataList) {
            for (List<AckRequestData> ackRequestDataInRequestList : Lists.partition(modackRequestData.getAckRequestData(), 1000)) {
                ArrayList<String> ackIdsInRequest = new ArrayList<String>();
                for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
                    ackIdsInRequest.add(ackRequestData.getAckId());
                    if (!ackRequestData.hasMessageFuture()) continue;
                    this.pendingRequests.add(ackRequestData);
                }
                ApiFutureCallback<Empty> callback = this.getCallback(modackRequestData.getAckRequestData(), modackRequestData.getDeadlineExtensionSeconds(), true, currentBackoffMillis);
                ApiFuture<Empty> modackFuture = this.subscriberStub.modifyAckDeadlineCallable().futureCall(ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscription).addAllAckIds(ackIdsInRequest).setAckDeadlineSeconds(modackRequestData.getDeadlineExtensionSeconds()).build());
                ApiFutures.addCallback(modackFuture, callback, MoreExecutors.directExecutor());
                ++pendingOperations;
            }
        }
        this.ackOperationsWaiter.incrementPendingCount(pendingOperations);
    }

    private Map<String, String> getMetadataMapFromThrowable(Throwable t2) throws InvalidProtocolBufferException {
        com.google.rpc.Status status = StatusProto.fromThrowable(t2);
        Map<String, String> metadataMap = new HashMap<String, String>();
        if (status != null) {
            for (Any any : status.getDetailsList()) {
                if (!any.is(ErrorInfo.class)) continue;
                ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
                metadataMap = errorInfo.getMetadataMap();
            }
        }
        return metadataMap;
    }

    private ApiFutureCallback<Empty> getCallback(final List<AckRequestData> ackRequestDataList, final int deadlineExtensionSeconds, final boolean isModack, final long currentBackoffMillis) {
        final boolean setResponseOnSuccess = !isModack || deadlineExtensionSeconds == 0;
        return new ApiFutureCallback<Empty>(){

            @Override
            public void onSuccess(Empty empty) {
                StreamingSubscriberConnection.this.ackOperationsWaiter.incrementPendingCount(-1);
                for (AckRequestData ackRequestData : ackRequestDataList) {
                    ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
                    StreamingSubscriberConnection.this.messageDispatcher.notifyAckSuccess(ackRequestData);
                    StreamingSubscriberConnection.this.pendingRequests.remove(ackRequestData);
                }
            }

            @Override
            public void onFailure(Throwable t2) {
                StreamingSubscriberConnection.this.ackOperationsWaiter.incrementPendingCount(-1);
                Level level = StreamingSubscriberConnection.this.isAlive() ? Level.WARNING : Level.FINER;
                logger.log(level, "failed to send operations", t2);
                if (!StreamingSubscriberConnection.this.getExactlyOnceDeliveryEnabled()) {
                    return;
                }
                final ArrayList ackRequestDataArrayRetryList = new ArrayList();
                try {
                    Map metadataMap = StreamingSubscriberConnection.this.getMetadataMapFromThrowable(t2);
                    ackRequestDataList.forEach(ackRequestData -> {
                        String ackId = ackRequestData.getAckId();
                        if (metadataMap.containsKey(ackId)) {
                            String errorMessage = (String)metadataMap.get(ackId);
                            if (errorMessage.startsWith("TRANSIENT_")) {
                                logger.log(Level.INFO, "Transient error message, will resend", errorMessage);
                                ackRequestDataArrayRetryList.add(ackRequestData);
                            } else if (errorMessage.equals("PERMANENT_FAILURE_INVALID_ACK_ID")) {
                                logger.log(Level.INFO, "Permanent error invalid ack id message, will not resend", errorMessage);
                                ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess);
                                StreamingSubscriberConnection.this.messageDispatcher.notifyAckFailed((AckRequestData)ackRequestData);
                            } else {
                                logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage);
                                ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess);
                                StreamingSubscriberConnection.this.messageDispatcher.notifyAckFailed((AckRequestData)ackRequestData);
                            }
                        } else {
                            ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
                            StreamingSubscriberConnection.this.messageDispatcher.notifyAckSuccess((AckRequestData)ackRequestData);
                        }
                        StreamingSubscriberConnection.this.pendingRequests.remove(ackRequestData);
                    });
                }
                catch (InvalidProtocolBufferException e) {
                    logger.log(Level.WARNING, "Exception occurred when parsing throwable {0} for errorInfo", t2);
                    ackRequestDataArrayRetryList.addAll(ackRequestDataList);
                }
                if (!ackRequestDataArrayRetryList.isEmpty()) {
                    final long newBackoffMillis = Math.min(currentBackoffMillis * 2L, MAX_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS);
                    StreamingSubscriberConnection.this.systemExecutor.schedule(new Runnable(){

                        @Override
                        public void run() {
                            if (isModack) {
                                ModackRequestData modackRequestData = new ModackRequestData(deadlineExtensionSeconds, ackRequestDataArrayRetryList);
                                StreamingSubscriberConnection.this.sendModackOperations(Collections.singletonList(modackRequestData), newBackoffMillis);
                            } else {
                                StreamingSubscriberConnection.this.sendAckOperations(ackRequestDataArrayRetryList, newBackoffMillis);
                            }
                        }
                    }, currentBackoffMillis, TimeUnit.MILLISECONDS);
                }
            }
        };
    }

    public static Builder newBuilder(MessageReceiver receiver) {
        return new Builder(receiver);
    }

    public static Builder newBuilder(MessageReceiverWithAckResponse receiverWithAckResponse) {
        return new Builder(receiverWithAckResponse);
    }

    public static final class Builder {
        private MessageReceiver receiver;
        private MessageReceiverWithAckResponse receiverWithAckResponse;
        private String subscription;
        private Duration ackExpirationPadding;
        private Duration maxAckExtensionPeriod;
        private Duration minDurationPerAckExtension;
        private boolean minDurationPerAckExtensionDefaultUsed;
        private Duration maxDurationPerAckExtension;
        private boolean maxDurationPerAckExtensionDefaultUsed;
        private Distribution ackLatencyDistribution;
        private SubscriberStub subscriberStub;
        private int channelAffinity;
        private FlowController flowController;
        private FlowControlSettings flowControlSettings;
        private boolean useLegacyFlowControl;
        private ScheduledExecutorService executor;
        private ScheduledExecutorService systemExecutor;
        private ApiClock clock;

        protected Builder(MessageReceiver receiver) {
            this.receiver = receiver;
        }

        protected Builder(MessageReceiverWithAckResponse receiverWithAckResponse) {
            this.receiverWithAckResponse = receiverWithAckResponse;
        }

        public Builder setSubscription(String subscription) {
            this.subscription = subscription;
            return this;
        }

        public Builder setAckExpirationPadding(Duration ackExpirationPadding) {
            this.ackExpirationPadding = ackExpirationPadding;
            return this;
        }

        public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
            this.maxAckExtensionPeriod = maxAckExtensionPeriod;
            return this;
        }

        public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) {
            this.minDurationPerAckExtension = minDurationPerAckExtension;
            return this;
        }

        public Builder setMinDurationPerAckExtensionDefaultUsed(boolean minDurationPerAckExtensionDefaultUsed) {
            this.minDurationPerAckExtensionDefaultUsed = minDurationPerAckExtensionDefaultUsed;
            return this;
        }

        public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
            this.maxDurationPerAckExtension = maxDurationPerAckExtension;
            return this;
        }

        public Builder setMaxDurationPerAckExtensionDefaultUsed(boolean maxDurationPerAckExtensionDefaultUsed) {
            this.maxDurationPerAckExtensionDefaultUsed = maxDurationPerAckExtensionDefaultUsed;
            return this;
        }

        public Builder setAckLatencyDistribution(Distribution ackLatencyDistribution) {
            this.ackLatencyDistribution = ackLatencyDistribution;
            return this;
        }

        public Builder setSubscriberStub(SubscriberStub subscriberStub) {
            this.subscriberStub = subscriberStub;
            return this;
        }

        public Builder setChannelAffinity(int channelAffinity) {
            this.channelAffinity = channelAffinity;
            return this;
        }

        public Builder setFlowController(FlowController flowController) {
            this.flowController = flowController;
            return this;
        }

        public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
            this.flowControlSettings = flowControlSettings;
            return this;
        }

        public Builder setUseLegacyFlowControl(boolean useLegacyFlowControl) {
            this.useLegacyFlowControl = useLegacyFlowControl;
            return this;
        }

        public Builder setExecutor(ScheduledExecutorService executor) {
            this.executor = executor;
            return this;
        }

        public Builder setSystemExecutor(ScheduledExecutorService systemExecutor) {
            this.systemExecutor = systemExecutor;
            return this;
        }

        public Builder setClock(ApiClock clock) {
            this.clock = clock;
            return this;
        }

        public StreamingSubscriberConnection build() {
            return new StreamingSubscriberConnection(this);
        }
    }

    private class StreamingPullResponseObserver
    implements ResponseObserver<StreamingPullResponse> {
        final SettableApiFuture<Void> errorFuture;
        StreamController thisController;

        StreamingPullResponseObserver(SettableApiFuture<Void> errorFuture) {
            this.errorFuture = errorFuture;
        }

        @Override
        public void onStart(StreamController controller) {
            this.thisController = controller;
            this.thisController.disableAutoInboundFlowControl();
            this.thisController.request(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onResponse(StreamingPullResponse response) {
            StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
            boolean exactlyOnceDeliveryEnabledResponse = response.getSubscriptionProperties().getExactlyOnceDeliveryEnabled();
            boolean messageOrderingEnabledResponse = response.getSubscriptionProperties().getMessageOrderingEnabled();
            StreamingSubscriberConnection.this.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
            StreamingSubscriberConnection.this.messageDispatcher.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
            StreamingSubscriberConnection.this.messageDispatcher.setMessageOrderingEnabled(messageOrderingEnabledResponse);
            StreamingSubscriberConnection.this.messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
            if (StreamingSubscriberConnection.this.isAlive() && !this.errorFuture.isDone()) {
                StreamingSubscriberConnection.this.lock.lock();
                try {
                    this.thisController.request(1);
                }
                catch (Exception e) {
                    logger.log(Level.WARNING, "cannot request more messages", e);
                }
                finally {
                    StreamingSubscriberConnection.this.lock.unlock();
                }
            }
        }

        @Override
        public void onError(Throwable t2) {
            this.errorFuture.setException(t2);
        }

        @Override
        public void onComplete() {
            logger.fine("Streaming pull terminated successfully!");
            this.errorFuture.set(null);
        }
    }
}

