package org.apache.camel.component.etcd3;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.StringHelper;

/* loaded from: input_file:org/apache/camel/component/etcd3/Etcd3Consumer.class */
class Etcd3Consumer extends DefaultConsumer implements Watch.Listener {
    private final Etcd3Configuration configuration;
    private final String path;
    private final Client client;
    private final Watch watch;
    private final AtomicLong revision;
    private final Charset keyCharset;
    private final AtomicReference<Watch.Watcher> watcher;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Etcd3Consumer(Etcd3Endpoint etcd3Endpoint, Processor processor, Etcd3Configuration etcd3Configuration, String str) {
        super(etcd3Endpoint, processor);
        this.watcher = new AtomicReference<>();
        this.configuration = etcd3Configuration;
        this.path = StringHelper.notEmpty(str, "path");
        this.client = etcd3Configuration.createClient();
        this.watch = this.client.getWatchClient();
        this.revision = new AtomicLong(etcd3Configuration.getFromIndex());
        this.keyCharset = Charset.forName(etcd3Configuration.getKeyCharset());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.DefaultConsumer, org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
        doWatch();
        super.doStart();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.DefaultConsumer, org.apache.camel.support.service.BaseService
    public void doStop() throws Exception {
        try {
            this.client.close();
        } finally {
            super.doStop();
        }
    }

    private void doWatch() {
        if (isRunAllowed()) {
            this.watcher.getAndUpdate(watcher -> {
                if (watcher != null) {
                    watcher.close();
                }
                return this.watch.watch(ByteSequence.from(this.path, this.keyCharset), WatchOption.newBuilder().isPrefix(this.configuration.isPrefix()).withRevision(this.revision.get()).build(), this);
            });
        }
    }

    @Override // io.etcd.jetcd.Watch.Listener
    public void onNext(WatchResponse watchResponse) {
        for (WatchEvent watchEvent : watchResponse.getEvents()) {
            Exchange createExchange = createExchange(false);
            KeyValue keyValue = watchEvent.getKeyValue();
            createExchange.getIn().setHeader(Etcd3Constants.ETCD_PATH, keyValue.getKey().toString(this.keyCharset));
            createExchange.getIn().setBody(watchEvent);
            this.revision.getAndUpdate(j -> {
                return Math.max(j, keyValue.getModRevision() + 1);
            });
            try {
                getProcessor().process(createExchange);
            } catch (Exception e) {
                getExceptionHandler().handleException("Error processing exchange", createExchange, e);
            }
            releaseExchange(createExchange, false);
        }
    }

    @Override // io.etcd.jetcd.Watch.Listener
    public void onError(Throwable th) {
        handleException("Error processing etcd response", th);
    }

    @Override // io.etcd.jetcd.Watch.Listener
    public void onCompleted() {
        doWatch();
    }
}
