/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.web3j;

import io.reactivex.disposables.Disposable;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.web3j.Web3jConfiguration;
import org.apache.camel.component.web3j.Web3jEndpoint;
import org.apache.camel.component.web3j.Web3jHelper;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.Transaction;

public class Web3jConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(Web3jConsumer.class);
    private final Web3jConfiguration configuration;
    private Web3j web3j;
    private Disposable subscription;
    private Web3jEndpoint endpoint;

    public Web3jConsumer(Web3jEndpoint endpoint, Processor processor, Web3jConfiguration configuration) {
        super(endpoint, processor);
        this.endpoint = endpoint;
        this.configuration = configuration;
    }

    @Override
    public Web3jEndpoint getEndpoint() {
        return (Web3jEndpoint)super.getEndpoint();
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        this.web3j = this.getEndpoint().getWeb3j();
        LOG.info("Subscribing to: {}", (Object)this.endpoint.getNodeAddress());
        switch (this.configuration.getOperation()) {
            case "ETH_LOG_OBSERVABLE": {
                EthFilter ethFilter = Web3jEndpoint.buildEthFilter(Web3jHelper.toDefaultBlockParameter(this.configuration.getFromBlock()), Web3jHelper.toDefaultBlockParameter(this.configuration.getToBlock()), this.configuration.getAddresses(), this.configuration.getTopics());
                this.subscription = this.web3j.ethLogFlowable(ethFilter).subscribe(this::ethLogObservable, t -> this.processError((Throwable)t, "ETH_LOG_OBSERVABLE"), () -> this.processDone("ETH_LOG_OBSERVABLE"));
                break;
            }
            case "ETH_BLOCK_HASH_OBSERVABLE": {
                this.subscription = this.web3j.ethBlockHashFlowable().subscribe(this::ethBlockHashObservable, t -> this.processError((Throwable)t, "ETH_BLOCK_HASH_OBSERVABLE"), () -> this.processDone("ETH_BLOCK_HASH_OBSERVABLE"));
                break;
            }
            case "ETH_PENDING_TRANSACTION_HASH_OBSERVABLE": {
                this.subscription = this.web3j.ethPendingTransactionHashFlowable().subscribe(this::ethPendingTransactionHashObservable, t -> this.processError((Throwable)t, "ETH_PENDING_TRANSACTION_HASH_OBSERVABLE"), () -> this.processDone("ETH_PENDING_TRANSACTION_HASH_OBSERVABLE"));
                break;
            }
            case "TRANSACTION_OBSERVABLE": {
                this.subscription = this.web3j.transactionFlowable().subscribe(this::processTransaction, t -> this.processError((Throwable)t, "TRANSACTION_OBSERVABLE"), () -> this.processDone("TRANSACTION_OBSERVABLE"));
                break;
            }
            case "PENDING_TRANSACTION_OBSERVABLE": {
                this.subscription = this.web3j.pendingTransactionFlowable().subscribe(this::processTransaction, t -> this.processError((Throwable)t, "PENDING_TRANSACTION_OBSERVABLE"), () -> this.processDone("PENDING_TRANSACTION_OBSERVABLE"));
                break;
            }
            case "BLOCK_OBSERVABLE": {
                this.subscription = this.web3j.blockFlowable(this.configuration.isFullTransactionObjects()).subscribe(this::blockObservable, t -> this.processError((Throwable)t, "BLOCK_OBSERVABLE"), () -> this.processDone("BLOCK_OBSERVABLE"));
                break;
            }
            case "REPLAY_BLOCKS_OBSERVABLE": {
                this.subscription = this.web3j.replayPastBlocksFlowable(Web3jHelper.toDefaultBlockParameter(this.configuration.getFromBlock()), Web3jHelper.toDefaultBlockParameter(this.configuration.getToBlock()), this.configuration.isFullTransactionObjects()).subscribe(this::blockObservable, t -> this.processError((Throwable)t, "REPLAY_BLOCKS_OBSERVABLE"), () -> this.processDone("REPLAY_BLOCKS_OBSERVABLE"));
                break;
            }
            case "REPLAY_TRANSACTIONS_OBSERVABLE": {
                this.subscription = this.web3j.replayPastTransactionsFlowable(Web3jHelper.toDefaultBlockParameter(this.configuration.getFromBlock()), Web3jHelper.toDefaultBlockParameter(this.configuration.getToBlock())).subscribe(this::processTransaction, t -> this.processError((Throwable)t, "REPLAY_TRANSACTIONS_OBSERVABLE"), () -> this.processDone("REPLAY_TRANSACTIONS_OBSERVABLE"));
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported operation " + this.configuration.getOperation());
            }
        }
        LOG.info("Subscribed: {}", (Object)this.configuration);
    }

    private void ethBlockHashObservable(String x) {
        LOG.debug("processEthBlock {}", (Object)x);
        Exchange exchange = this.getEndpoint().createExchange();
        exchange.getIn().setBody(x);
        this.processEvent(exchange);
    }

    private void ethPendingTransactionHashObservable(String x) {
        LOG.debug("processEthBlock {}", (Object)x);
        Exchange exchange = this.getEndpoint().createExchange();
        exchange.getIn().setBody(x);
        this.processEvent(exchange);
    }

    private void blockObservable(EthBlock x) {
        EthBlock.Block block = x.getBlock();
        LOG.debug("processEthBlock {}", (Object)block);
        Exchange exchange = this.getEndpoint().createExchange();
        exchange.getIn().setBody(block);
        this.processEvent(exchange);
    }

    private void processTransaction(Transaction x) {
        LOG.debug("processTransaction {}", (Object)x);
        Exchange exchange = this.getEndpoint().createExchange();
        exchange.getIn().setBody(x);
        this.processEvent(exchange);
    }

    private void ethLogObservable(Log x) {
        LOG.debug("processLogObservable {}", (Object)x);
        Exchange exchange = this.getEndpoint().createExchange();
        exchange.getIn().setBody(x);
        this.processEvent(exchange);
    }

    public void processEvent(Exchange exchange) {
        LOG.debug("processEvent {}", (Object)exchange);
        try {
            this.getProcessor().process(exchange);
        }
        catch (Exception e) {
            LOG.error("Error processing event ", (Throwable)e);
        }
    }

    private void processDone(String operation) {
        LOG.debug("processDone for operation: {}", (Object)operation);
        Exchange exchange = this.getEndpoint().createExchange();
        exchange.getIn().setHeader("status", "done");
        exchange.getIn().setHeader("operation", operation);
        this.processEvent(exchange);
    }

    private void processError(Throwable throwable, String operation) {
        LOG.debug("processError for operation: {} {}", new Object[]{operation, throwable.getMessage(), throwable});
        Exchange exchange = this.getEndpoint().createExchange();
        exchange.setException(throwable);
        this.processEvent(exchange);
    }

    @Override
    protected void doStop() throws Exception {
        if (this.subscription != null && !this.subscription.isDisposed()) {
            this.subscription.dispose();
        }
        super.doStop();
    }
}

