package org.apache.camel.component.etcd;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import mousio.client.promises.ResponsePromise;
import mousio.etcd4j.requests.EtcdKeyGetRequest;
import mousio.etcd4j.responses.EtcdException;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/etcd/EtcdWatchConsumer.class */
public class EtcdWatchConsumer extends AbstractEtcdConsumer implements ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EtcdWatchConsumer.class);
    private final EtcdWatchEndpoint endpoint;
    private final EtcdConfiguration configuration;
    private final AtomicLong index;

    public EtcdWatchConsumer(EtcdWatchEndpoint etcdWatchEndpoint, Processor processor, EtcdConfiguration etcdConfiguration, EtcdNamespace etcdNamespace, String str) {
        super(etcdWatchEndpoint, processor, etcdConfiguration, etcdNamespace, str);
        this.endpoint = etcdWatchEndpoint;
        this.configuration = etcdConfiguration;
        this.index = new AtomicLong(etcdConfiguration.getFromIndex().longValue());
    }

    protected void doStart() throws Exception {
        super.doStart();
        watch();
    }

    @Override // org.apache.camel.component.etcd.AbstractEtcdConsumer
    protected void doStop() throws Exception {
        super.doStop();
    }

    @Override // mousio.client.promises.ResponsePromise.IsSimplePromiseResponseHandler
    public void onResponse(ResponsePromise<EtcdKeysResponse> responsePromise) {
        if (isRunAllowed()) {
            Exchange exchange = null;
            Throwable exception = responsePromise.getException();
            if (exception == null || !(exception instanceof EtcdException)) {
                try {
                    EtcdKeysResponse etcdKeysResponse = responsePromise.get();
                    exchange = this.endpoint.createExchange();
                    exchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, getNamespace());
                    exchange.getIn().setHeader(EtcdConstants.ETCD_PATH, etcdKeysResponse.node.key);
                    exchange.getIn().setBody(etcdKeysResponse);
                    this.index.set(etcdKeysResponse.node.modifiedIndex.longValue() + 1);
                } catch (TimeoutException e) {
                    LOGGER.debug("Timeout watching for {}", getPath());
                    if (this.configuration.isSendEmptyExchangeOnTimeout()) {
                        exchange = this.endpoint.createExchange();
                        exchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, getNamespace());
                        exchange.getIn().setHeader(EtcdConstants.ETCD_TIMEOUT, true);
                        exchange.getIn().setHeader(EtcdConstants.ETCD_PATH, getPath());
                        exchange.getIn().setBody((Object) null);
                    }
                    exception = null;
                } catch (Exception e2) {
                    exception = e2;
                }
                if (exchange != null) {
                    try {
                        exception = null;
                        getProcessor().process(exchange);
                    } catch (Exception e3) {
                        getExceptionHandler().handleException("Error processing exchange", exchange, e3);
                    }
                }
            } else {
                EtcdException etcdException = (EtcdException) exception;
                if (EtcdHelper.isOutdatedIndexException(etcdException)) {
                    LOGGER.debug("Outdated index, key: {}, cause={}", getPath(), etcdException.etcdCause);
                    this.index.set(etcdException.index.longValue() + 1);
                    exception = null;
                }
            }
            if (exception != null) {
                handleException("Error processing etcd response", exception);
            }
            try {
                watch();
            } catch (Exception e4) {
                handleException("Error watching key " + getPath(), e4);
            }
        }
    }

    private void watch() throws Exception {
        if (isRunAllowed()) {
            EtcdKeyGetRequest waitForChange = getClient().get(getPath()).waitForChange(this.index.get());
            if (this.configuration.isRecursive()) {
                waitForChange.recursive();
            }
            if (this.configuration.getTimeout() != null) {
                waitForChange.timeout(this.configuration.getTimeout().longValue(), TimeUnit.MILLISECONDS);
            }
            waitForChange.send().addListener(this);
        }
    }
}
