package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.events.FailedToAddNodeEvent;
import com.couchbase.client.dcp.events.FailedToMovePartitionEvent;
import com.couchbase.client.dcp.events.FailedToRemoveNodeEvent;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.metrics.DcpClientMetrics;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.util.retry.RetryBuilder;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.util.internal.ConcurrentSet;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.Single;
import rx.Subscription;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/Conductor.class */
public class Conductor {
    private static final Logger LOGGER = LoggerFactory.getLogger(Conductor.class);
    private final ConfigProvider configProvider;
    private final ClientEnvironment env;
    private final boolean ownsConfigProvider;
    private final DcpClientMetrics metrics;
    private final Set<DcpChannel> channels = new ConcurrentSet();
    private volatile boolean stopped = true;
    private final AtomicReference<DcpBucketConfig> currentConfig = new AtomicReference<>();
    private final SessionState sessionState = new SessionState();

    public Conductor(ClientEnvironment clientEnvironment, ConfigProvider configProvider, DcpClientMetrics dcpClientMetrics) {
        this.metrics = (DcpClientMetrics) Objects.requireNonNull(dcpClientMetrics);
        this.env = clientEnvironment;
        this.configProvider = configProvider == null ? new HttpStreamingConfigProvider(clientEnvironment) : configProvider;
        this.ownsConfigProvider = configProvider == null;
        this.configProvider.configs().forEach(dcpBucketConfig -> {
            LOGGER.trace("Applying new configuration, new rev is {}.", Long.valueOf(dcpBucketConfig.rev()));
            this.currentConfig.set(dcpBucketConfig);
            reconfigure(dcpBucketConfig);
        });
    }

    public SessionState sessionState() {
        return this.sessionState;
    }

    public Completable connect() {
        this.stopped = false;
        return this.configProvider.start().timeout(this.env.connectTimeout(), TimeUnit.SECONDS).doOnError(th -> {
            LOGGER.warn("Cannot connect configuration provider.");
        }).concatWith(this.configProvider.configs().filter(dcpBucketConfig -> {
            return Boolean.valueOf(dcpBucketConfig.numberOfPartitions() != 0);
        }).first().toCompletable().timeout(this.env.bootstrapTimeout(), TimeUnit.SECONDS).doOnError(th2 -> {
            LOGGER.warn("Did not receive initial configuration from provider.");
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConfigProvider configProvider() {
        return this.configProvider;
    }

    public boolean disconnected() {
        if (!this.configProvider.isState(LifecycleState.DISCONNECTED)) {
            return false;
        }
        Iterator<DcpChannel> it = this.channels.iterator();
        while (it.hasNext()) {
            if (!it.next().isState(LifecycleState.DISCONNECTED)) {
                return false;
            }
        }
        return true;
    }

    public Completable stop() {
        LOGGER.debug("Instructed to shutdown.");
        this.stopped = true;
        Completable completable = Observable.from(this.channels).flatMapCompletable((v0) -> {
            return v0.disconnect();
        }).toCompletable();
        if (this.ownsConfigProvider) {
            completable = completable.andThen(this.configProvider.stop());
        }
        return completable.doOnCompleted(() -> {
            LOGGER.info("Shutdown complete.");
        });
    }

    public int numberOfPartitions() {
        return this.currentConfig.get().numberOfPartitions();
    }

    public Observable<ByteBuf> getSeqnos() {
        return Observable.from(this.channels).flatMap(this::getSeqnosForChannel);
    }

    private Observable<ByteBuf> getSeqnosForChannel(DcpChannel dcpChannel) {
        return Observable.just(dcpChannel).flatMapSingle((v0) -> {
            return v0.getSeqnos();
        }).retryWhen(RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).doOnRetry((num, th, l, timeUnit) -> {
            LOGGER.debug("Rescheduling get Seqnos for channel {}, not connected (yet).", dcpChannel);
        }).build());
    }

    public Single<ByteBuf> getFailoverLog(short s) {
        return Observable.just(Short.valueOf(s)).map(sh -> {
            return activeChannelByPartition(s);
        }).flatMapSingle(dcpChannel -> {
            return dcpChannel.getFailoverLog(s);
        }).retryWhen(RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).doOnRetry((num, th, l, timeUnit) -> {
            LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", Short.valueOf(s));
        }).build()).toSingle();
    }

    public Completable startStreamForPartition(short s, StreamOffset streamOffset, long j) {
        return Observable.just(Short.valueOf(s)).map(sh -> {
            return activeChannelByPartition(s);
        }).flatMapCompletable(dcpChannel -> {
            return dcpChannel.openStream(s, streamOffset, j);
        }).retryWhen(RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).doOnRetry((num, th, l, timeUnit) -> {
            LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", Short.valueOf(s));
        }).build()).toCompletable();
    }

    public Completable stopStreamForPartition(short s) {
        return streamIsOpen(s) ? activeChannelByPartition(s).closeStream(s) : Completable.complete();
    }

    public boolean streamIsOpen(short s) {
        return activeChannelByPartition(s).streamIsOpen(s);
    }

    private DcpChannel activeChannelByPartition(short s) {
        InetSocketAddress address = this.currentConfig.get().getActiveNodeKvAddress(s).toAddress();
        for (DcpChannel dcpChannel : this.channels) {
            if (dcpChannel.address().equals(address)) {
                return dcpChannel;
            }
        }
        throw new IllegalStateException("No DcpChannel found for partition " + ((int) s));
    }

    private void reconfigure(DcpBucketConfig dcpBucketConfig) {
        this.metrics.incrementReconfigure();
        List<NodeInfo> dataNodes = dcpBucketConfig.getDataNodes(!this.env.persistencePollingEnabled());
        Map map = (Map) this.channels.stream().collect(Collectors.toMap((v0) -> {
            return v0.address();
        }, dcpChannel -> {
            return dcpChannel;
        }));
        Stream<NodeInfo> stream = dataNodes.stream();
        dcpBucketConfig.getClass();
        Set<InetSocketAddress> set = (Set) stream.map(dcpBucketConfig::getAddress).collect(Collectors.toSet());
        for (InetSocketAddress inetSocketAddress : set) {
            if (!map.containsKey(inetSocketAddress)) {
                this.metrics.incrementAddChannel();
                add(inetSocketAddress);
            }
        }
        for (Map.Entry entry : map.entrySet()) {
            if (!set.contains(entry.getKey())) {
                this.metrics.incrementRemoveChannel();
                remove((DcpChannel) entry.getValue());
            }
        }
    }

    private void add(final InetSocketAddress inetSocketAddress) {
        LOGGER.info("Adding DCP Channel against {}", RedactableArgument.system(inetSocketAddress));
        DcpChannel dcpChannel = new DcpChannel(inetSocketAddress, this.env, this);
        if (!this.channels.add(dcpChannel)) {
            throw new IllegalStateException("Tried to add duplicate channel: " + RedactableArgument.system(dcpChannel));
        }
        dcpChannel.connect().retryWhen(RetryBuilder.anyMatches(th -> {
            return Boolean.valueOf(!this.stopped);
        }).max(this.env.dcpChannelsReconnectMaxAttempts()).delay(this.env.dcpChannelsReconnectDelay()).doOnRetry((num, th2, l, timeUnit) -> {
            LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", inetSocketAddress);
        }).build()).subscribe(new CompletableSubscriber() { // from class: com.couchbase.client.dcp.conductor.Conductor.1
            public void onCompleted() {
                Conductor.LOGGER.debug("Completed Node connect for DCP channel {}", inetSocketAddress);
            }

            public void onError(Throwable th3) {
                Conductor.LOGGER.warn("Got error during connect (maybe retried) for node {}", RedactableArgument.system(inetSocketAddress), th3);
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish(new FailedToAddNodeEvent(inetSocketAddress, th3));
                }
            }

            public void onSubscribe(Subscription subscription) {
            }
        });
    }

    private void remove(final DcpChannel dcpChannel) {
        if (!this.channels.remove(dcpChannel)) {
            throw new IllegalStateException("Tried to remove unknown channel: " + RedactableArgument.system(dcpChannel));
        }
        LOGGER.info("Removing DCP Channel against {}", RedactableArgument.system(dcpChannel));
        short s = 0;
        while (true) {
            short s2 = s;
            if (s2 >= dcpChannel.streamIsOpen.length()) {
                dcpChannel.disconnect().subscribe(new CompletableSubscriber() { // from class: com.couchbase.client.dcp.conductor.Conductor.2
                    public void onCompleted() {
                        Conductor.LOGGER.debug("Channel remove notified as complete for {}", dcpChannel.address());
                    }

                    public void onError(Throwable th) {
                        Conductor.LOGGER.warn("Got error during Node removal for node {}", RedactableArgument.system(dcpChannel.address()), th);
                        if (Conductor.this.env.eventBus() != null) {
                            Conductor.this.env.eventBus().publish(new FailedToRemoveNodeEvent(dcpChannel.address(), th));
                        }
                    }

                    public void onSubscribe(Subscription subscription) {
                    }
                });
                return;
            } else {
                if (dcpChannel.streamIsOpen(s2)) {
                    maybeMovePartition(s2);
                }
                s = (short) (s2 + 1);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeMovePartition(final short s) {
        Observable.timer(50L, TimeUnit.MILLISECONDS).filter(l -> {
            PartitionState partitionState = this.sessionState.get(s);
            boolean isAtEnd = partitionState.isAtEnd();
            if (isAtEnd) {
                LOGGER.debug("Reached desired high seqno {} for vbucket {}, not reopening stream.", Long.valueOf(partitionState.getEndSeqno()), Short.valueOf(s));
            }
            return Boolean.valueOf(!isAtEnd);
        }).flatMapCompletable(l2 -> {
            PartitionState partitionState = this.sessionState.get(s);
            return startStreamForPartition(s, partitionState.getOffset(), partitionState.getEndSeqno()).retryWhen(RetryBuilder.anyOf(NotMyVbucketException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).build());
        }).toCompletable().subscribe(new CompletableSubscriber() { // from class: com.couchbase.client.dcp.conductor.Conductor.3
            public void onCompleted() {
                Conductor.LOGGER.trace("Completed Partition Move for partition {}", Short.valueOf(s));
            }

            public void onError(Throwable th) {
                if (th instanceof RollbackException) {
                    Conductor.LOGGER.warn("Rollback during Partition Move for partition {}", Short.valueOf(s));
                } else {
                    Conductor.LOGGER.warn("Error during Partition Move for partition {}", Short.valueOf(s), th);
                }
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish(new FailedToMovePartitionEvent(s, th));
                }
            }

            public void onSubscribe(Subscription subscription) {
                Conductor.LOGGER.debug("Subscribing for Partition Move for partition {}", Short.valueOf(s));
            }
        });
    }
}
