/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.beanstalk;

import com.surftools.BeanstalkClient.BeanstalkException;
import com.surftools.BeanstalkClient.Client;
import com.surftools.BeanstalkClient.Job;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.beanstalk.BeanstalkCommand;
import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
import org.apache.camel.component.beanstalk.processors.BuryCommand;
import org.apache.camel.component.beanstalk.processors.Command;
import org.apache.camel.component.beanstalk.processors.DeleteCommand;
import org.apache.camel.component.beanstalk.processors.ReleaseCommand;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.spi.Synchronization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeanstalkConsumer
extends ScheduledPollConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(BeanstalkConsumer.class);
    private static final String[] STATS_KEY_STR = new String[]{"tube", "state"};
    private static final String[] STATS_KEY_INT = new String[]{"age", "time-left", "timeouts", "releases", "buries", "kicks"};
    private BeanstalkCommand onFailure;
    private boolean useBlockIO;
    private boolean awaitJob;
    private Client client;
    private ExecutorService executor;
    private Synchronization sync;
    private final Runnable initTask = new Runnable(){

        @Override
        public void run() {
            BeanstalkConsumer.this.client = BeanstalkConsumer.this.getEndpoint().getConnection().newReadingClient(BeanstalkConsumer.this.useBlockIO);
        }
    };
    private final Callable<Exchange> pollTask = new Callable<Exchange>(){
        final Integer noWait = 0;

        @Override
        public Exchange call() throws Exception {
            if (BeanstalkConsumer.this.client == null) {
                throw new RuntimeCamelException("Beanstalk client not initialized");
            }
            try {
                Job job = BeanstalkConsumer.this.client.reserve(this.noWait);
                if (job == null) {
                    return null;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("Received job ID %d (data length %d)", job.getJobId(), job.getData().length));
                }
                Exchange exchange = BeanstalkConsumer.this.getEndpoint().createExchange(ExchangePattern.InOnly);
                exchange.getIn().setHeader("beanstalk.jobId", (Object)job.getJobId());
                exchange.getIn().setBody((Object)job.getData(), byte[].class);
                Map<String, String> jobStats = BeanstalkConsumer.this.client.statsJob(job.getJobId());
                if (jobStats != null && !jobStats.isEmpty()) {
                    for (String key : STATS_KEY_STR) {
                        if (!jobStats.containsKey(key)) continue;
                        exchange.getIn().setHeader("beanstalk." + key, (Object)jobStats.get(key).trim());
                    }
                    if (jobStats.containsKey("pri")) {
                        exchange.getIn().setHeader("beanstalk.priority", (Object)Long.parseLong(jobStats.get("pri").trim()));
                    }
                    for (String key : STATS_KEY_INT) {
                        if (!jobStats.containsKey(key)) continue;
                        exchange.getIn().setHeader("beanstalk." + key, (Object)Integer.parseInt(jobStats.get(key).trim()));
                    }
                }
                if (!BeanstalkConsumer.this.awaitJob) {
                    BeanstalkConsumer.this.client.delete(job.getJobId());
                } else {
                    exchange.addOnCompletion(BeanstalkConsumer.this.sync);
                }
                return exchange;
            }
            catch (BeanstalkException e) {
                BeanstalkConsumer.this.getExceptionHandler().handleException("Beanstalk client error", (Throwable)e);
                BeanstalkConsumer.this.resetClient();
                return null;
            }
        }
    };

    public BeanstalkConsumer(BeanstalkEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
    }

    protected int poll() throws Exception {
        Exchange exchange;
        int messagesPolled = 0;
        while (this.isPollAllowed() && (exchange = this.executor.submit(this.pollTask).get()) != null) {
            ++messagesPolled;
            this.getProcessor().process(exchange);
        }
        return messagesPolled;
    }

    public BeanstalkCommand getOnFailure() {
        return this.onFailure;
    }

    public void setOnFailure(BeanstalkCommand onFailure) {
        this.onFailure = onFailure;
    }

    public boolean isUseBlockIO() {
        return this.useBlockIO;
    }

    public void setUseBlockIO(boolean useBlockIO) {
        this.useBlockIO = useBlockIO;
    }

    public boolean isAwaitJob() {
        return this.awaitJob;
    }

    public void setAwaitJob(boolean awaitJob) {
        this.awaitJob = awaitJob;
    }

    public BeanstalkEndpoint getEndpoint() {
        return (BeanstalkEndpoint)super.getEndpoint();
    }

    protected void doStart() throws Exception {
        this.executor = this.getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor((Object)this, "Beanstalk-Consumer");
        this.executor.execute(this.initTask);
        this.sync = new Sync();
        super.doStart();
    }

    protected void doStop() throws Exception {
        super.doStop();
        if (this.executor != null) {
            this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(this.executor);
        }
    }

    protected void resetClient() {
        if (this.client != null) {
            this.client.close();
        }
        this.initTask.run();
    }

    class Sync
    implements Synchronization {
        protected final Command successCommand;
        protected final Command failureCommand;

        Sync() {
            this.successCommand = new DeleteCommand(BeanstalkConsumer.this.getEndpoint());
            if ("bury".equals(BeanstalkConsumer.this.onFailure.name())) {
                this.failureCommand = new BuryCommand(BeanstalkConsumer.this.getEndpoint());
            } else if ("release".equals(BeanstalkConsumer.this.onFailure.name())) {
                this.failureCommand = new ReleaseCommand(BeanstalkConsumer.this.getEndpoint());
            } else if ("delete".equals(BeanstalkConsumer.this.onFailure.name())) {
                this.failureCommand = new DeleteCommand(BeanstalkConsumer.this.getEndpoint());
            } else {
                throw new IllegalArgumentException(String.format("Unknown failure command: %s", new Object[]{BeanstalkConsumer.this.onFailure}));
            }
        }

        public void onComplete(Exchange exchange) {
            try {
                BeanstalkConsumer.this.executor.submit(new RunCommand(this.successCommand, exchange)).get();
            }
            catch (Exception e) {
                LOG.error(String.format("Could not run completion of exchange %s", exchange), (Throwable)e);
            }
        }

        public void onFailure(Exchange exchange) {
            try {
                BeanstalkConsumer.this.executor.submit(new RunCommand(this.failureCommand, exchange)).get();
            }
            catch (Exception e) {
                LOG.error(String.format("%s could not run failure of exchange %s", this.failureCommand.getClass().getName(), exchange), (Throwable)e);
            }
        }

        class RunCommand
        implements Runnable {
            private final Command command;
            private final Exchange exchange;

            RunCommand(Command command, Exchange exchange) {
                this.command = command;
                this.exchange = exchange;
            }

            @Override
            public void run() {
                try {
                    try {
                        this.command.act(BeanstalkConsumer.this.client, this.exchange);
                    }
                    catch (BeanstalkException e) {
                        LOG.warn(String.format("Post-processing %s of exchange %s failed, retrying.", this.command.getClass().getName(), this.exchange), (Throwable)e);
                        BeanstalkConsumer.this.resetClient();
                        this.command.act(BeanstalkConsumer.this.client, this.exchange);
                    }
                }
                catch (Exception e) {
                    LOG.error(String.format("%s could not post-process exchange %s", this.command.getClass().getName(), this.exchange), (Throwable)e);
                    this.exchange.setException((Throwable)e);
                }
            }
        }
    }
}

