/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.disruptor;

import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicMarkableReference;
import java.util.concurrent.locks.LockSupport;
import org.apache.camel.Exchange;
import org.apache.camel.component.disruptor.AbstractLifecycleAwareExchangeEventHandler;
import org.apache.camel.component.disruptor.DisruptorComponent;
import org.apache.camel.component.disruptor.DisruptorConsumer;
import org.apache.camel.component.disruptor.DisruptorEndpoint;
import org.apache.camel.component.disruptor.DisruptorNotStartedException;
import org.apache.camel.component.disruptor.DisruptorProducerType;
import org.apache.camel.component.disruptor.DisruptorWaitStrategy;
import org.apache.camel.component.disruptor.ExchangeEvent;
import org.apache.camel.component.disruptor.ExchangeEventFactory;
import org.apache.camel.component.disruptor.LifecycleAwareExchangeEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorReference {
    private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorReference.class);
    private final Set<DisruptorEndpoint> endpoints = Collections.newSetFromMap(new WeakHashMap(4));
    private final DisruptorComponent component;
    private final String uri;
    private final String name;
    private final AtomicMarkableReference<Disruptor<ExchangeEvent>> disruptor = new AtomicMarkableReference<Object>(null, false);
    private final DelayedExecutor delayedExecutor = new DelayedExecutor();
    private final DisruptorProducerType producerType;
    private final int size;
    private final DisruptorWaitStrategy waitStrategy;
    private final Queue<Exchange> temporaryExchangeBuffer;
    private ExecutorService executor;
    private LifecycleAwareExchangeEventHandler[] handlers = new LifecycleAwareExchangeEventHandler[0];
    private int uniqueConsumerCount;

    DisruptorReference(DisruptorComponent component, String uri, String name, int size, DisruptorProducerType producerType, DisruptorWaitStrategy waitStrategy) throws Exception {
        this.component = component;
        this.uri = uri;
        this.name = name;
        this.size = size;
        this.producerType = producerType;
        this.waitStrategy = waitStrategy;
        this.temporaryExchangeBuffer = new ArrayBlockingQueue<Exchange>(size);
        this.reconfigure();
    }

    public boolean hasNullReference() {
        return this.disruptor.getReference() == null;
    }

    private Disruptor<ExchangeEvent> getCurrentDisruptor() throws DisruptorNotStartedException {
        Disruptor<ExchangeEvent> currentDisruptor = this.disruptor.getReference();
        if (currentDisruptor == null) {
            boolean[] changeIsPending = new boolean[1];
            while (currentDisruptor == null) {
                currentDisruptor = this.disruptor.get(changeIsPending);
                if (currentDisruptor == null && !changeIsPending[0]) {
                    throw new DisruptorNotStartedException("Disruptor is not yet started or already shut down.");
                }
                if (currentDisruptor != null || !changeIsPending[0]) continue;
                LockSupport.parkNanos(1L);
            }
        }
        return currentDisruptor;
    }

    public void tryPublish(Exchange exchange) throws DisruptorNotStartedException, InsufficientCapacityException {
        this.tryPublishExchangeOnRingBuffer(exchange, this.getCurrentDisruptor().getRingBuffer());
    }

    public void publish(Exchange exchange) throws DisruptorNotStartedException {
        this.publishExchangeOnRingBuffer(exchange, this.getCurrentDisruptor().getRingBuffer());
    }

    private void publishExchangeOnRingBuffer(Exchange exchange, RingBuffer<ExchangeEvent> ringBuffer) {
        long sequence = ringBuffer.next();
        ringBuffer.get(sequence).setExchange(exchange, this.uniqueConsumerCount);
        ringBuffer.publish(sequence);
    }

    private void tryPublishExchangeOnRingBuffer(Exchange exchange, RingBuffer<ExchangeEvent> ringBuffer) throws InsufficientCapacityException {
        long sequence = ringBuffer.tryNext();
        ringBuffer.get(sequence).setExchange(exchange, this.uniqueConsumerCount);
        ringBuffer.publish(sequence);
    }

    public synchronized void reconfigure() throws Exception {
        LOGGER.debug("Reconfiguring disruptor {}", (Object)this);
        this.shutdownDisruptor(true);
        this.start();
    }

    private void start() throws Exception {
        LOGGER.debug("Starting disruptor {}", (Object)this);
        Disruptor<ExchangeEvent> newDisruptor = this.createDisruptor();
        newDisruptor.start();
        if (this.executor != null) {
            this.delayedExecutor.executeDelayedCommands(this.executor);
        }
        for (LifecycleAwareExchangeEventHandler handler : this.handlers) {
            boolean eventHandlerStarted = false;
            while (!eventHandlerStarted) {
                try {
                    if (!handler.awaitStarted(10L, TimeUnit.SECONDS)) {
                        LOGGER.error("Disruptor/event handler failed to start properly, PLEASE REPORT");
                    }
                    eventHandlerStarted = true;
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        this.publishBufferedExchanges(newDisruptor);
        this.disruptor.set(newDisruptor, false);
    }

    private Disruptor<ExchangeEvent> createDisruptor() throws Exception {
        Disruptor<ExchangeEvent> newDisruptor = new Disruptor<ExchangeEvent>(ExchangeEventFactory.INSTANCE, this.size, this.delayedExecutor, this.producerType.getProducerType(), this.waitStrategy.createWaitStrategyInstance());
        ArrayList<LifecycleAwareExchangeEventHandler> eventHandlers = new ArrayList<LifecycleAwareExchangeEventHandler>();
        this.uniqueConsumerCount = 0;
        for (DisruptorEndpoint endpoint : this.endpoints) {
            Map<DisruptorConsumer, Collection<LifecycleAwareExchangeEventHandler>> consumerEventHandlers = endpoint.createConsumerEventHandlers();
            if (consumerEventHandlers == null) continue;
            this.uniqueConsumerCount += consumerEventHandlers.keySet().size();
            for (Collection<LifecycleAwareExchangeEventHandler> lifecycleAwareExchangeEventHandlers : consumerEventHandlers.values()) {
                eventHandlers.addAll(lifecycleAwareExchangeEventHandlers);
            }
        }
        LOGGER.debug("Disruptor created with {} event handlers", (Object)eventHandlers.size());
        this.handleEventsWith(newDisruptor, eventHandlers.toArray(new LifecycleAwareExchangeEventHandler[eventHandlers.size()]));
        return newDisruptor;
    }

    private void handleEventsWith(Disruptor<ExchangeEvent> newDisruptor, LifecycleAwareExchangeEventHandler[] newHandlers) {
        if (newHandlers == null || newHandlers.length == 0) {
            this.handlers = new LifecycleAwareExchangeEventHandler[1];
            this.handlers[0] = new BlockingExchangeEventHandler();
        } else {
            this.handlers = newHandlers;
        }
        this.resizeThreadPoolExecutor(this.handlers.length);
        newDisruptor.handleEventsWith(this.handlers);
    }

    private void publishBufferedExchanges(Disruptor<ExchangeEvent> newDisruptor) {
        ArrayList<Exchange> exchanges = new ArrayList<Exchange>(this.temporaryExchangeBuffer.size());
        while (!this.temporaryExchangeBuffer.isEmpty()) {
            exchanges.add(this.temporaryExchangeBuffer.remove());
        }
        RingBuffer<ExchangeEvent> ringBuffer = newDisruptor.getRingBuffer();
        for (Exchange exchange : exchanges) {
            this.publishExchangeOnRingBuffer(exchange, ringBuffer);
        }
    }

    private void resizeThreadPoolExecutor(int newSize) {
        if (this.executor == null && newSize > 0) {
            LOGGER.debug("Creating new executor with {} threads", (Object)newSize);
            this.executor = this.component.getCamelContext().getExecutorServiceManager().newFixedThreadPool((Object)this, this.uri, newSize);
        } else if (this.executor != null && newSize <= 0) {
            LOGGER.debug("Shutting down executor");
            this.component.getCamelContext().getExecutorServiceManager().shutdown(this.executor);
            this.executor = null;
        } else if (this.executor instanceof ThreadPoolExecutor) {
            LOGGER.debug("Resizing existing executor to {} threads", (Object)newSize);
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)this.executor;
            if (newSize <= threadPoolExecutor.getCorePoolSize()) {
                threadPoolExecutor.setCorePoolSize(newSize);
                threadPoolExecutor.setMaximumPoolSize(newSize);
            } else {
                threadPoolExecutor.setMaximumPoolSize(newSize);
                threadPoolExecutor.setCorePoolSize(newSize);
            }
        } else if (newSize > 0) {
            LOGGER.debug("Shutting down old and creating new executor with {} threads", (Object)newSize);
            this.component.getCamelContext().getExecutorServiceManager().shutdown(this.executor);
            this.executor = this.component.getCamelContext().getExecutorServiceManager().newFixedThreadPool((Object)this, this.uri, newSize);
        }
    }

    private synchronized void shutdownDisruptor(boolean isReconfiguring) {
        LOGGER.debug("Shutting down disruptor {}, reconfiguring: {}", (Object)this, (Object)isReconfiguring);
        Disruptor<ExchangeEvent> currentDisruptor = this.disruptor.getReference();
        this.disruptor.set(null, isReconfiguring);
        if (currentDisruptor != null) {
            if (this.handlers != null && this.handlers.length == 1 && this.handlers[0] instanceof BlockingExchangeEventHandler) {
                BlockingExchangeEventHandler blockingExchangeEventHandler = (BlockingExchangeEventHandler)this.handlers[0];
                blockingExchangeEventHandler.unblock();
            }
            currentDisruptor.shutdown();
            for (LifecycleAwareExchangeEventHandler eventHandler : this.handlers) {
                boolean eventHandlerFinished = false;
                while (!eventHandlerFinished) {
                    try {
                        if (!eventHandler.awaitStopped(10L, TimeUnit.SECONDS)) {
                            LOGGER.error("Disruptor/event handler failed to shut down properly, PLEASE REPORT");
                        }
                        eventHandlerFinished = true;
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
            this.handlers = new LifecycleAwareExchangeEventHandler[0];
        }
    }

    private synchronized void shutdownExecutor() {
        this.resizeThreadPoolExecutor(0);
    }

    public String getName() {
        return this.name;
    }

    public long getRemainingCapacity() throws DisruptorNotStartedException {
        return this.getCurrentDisruptor().getRingBuffer().remainingCapacity();
    }

    public DisruptorWaitStrategy getWaitStrategy() {
        return this.waitStrategy;
    }

    DisruptorProducerType getProducerType() {
        return this.producerType;
    }

    public int getBufferSize() {
        return this.size;
    }

    public int getPendingExchangeCount() {
        try {
            if (!this.hasNullReference()) {
                return (int)((long)this.getBufferSize() - this.getRemainingCapacity() + (long)this.temporaryExchangeBuffer.size());
            }
        }
        catch (DisruptorNotStartedException disruptorNotStartedException) {
            // empty catch block
        }
        return this.temporaryExchangeBuffer.size();
    }

    public synchronized void addEndpoint(DisruptorEndpoint disruptorEndpoint) {
        LOGGER.debug("Adding Endpoint: " + (Object)((Object)disruptorEndpoint));
        this.endpoints.add(disruptorEndpoint);
        LOGGER.debug("Endpoint added: {}, new total endpoints {}", (Object)disruptorEndpoint, (Object)this.endpoints.size());
    }

    public synchronized void removeEndpoint(DisruptorEndpoint disruptorEndpoint) {
        LOGGER.debug("Removing Endpoint: " + (Object)((Object)disruptorEndpoint));
        if (this.getEndpointCount() == 1) {
            LOGGER.debug("Last Endpoint removed, shutdown disruptor");
            this.shutdownDisruptor(false);
            this.shutdownExecutor();
        }
        this.endpoints.remove((Object)disruptorEndpoint);
        LOGGER.debug("Endpoint removed: {}, new total endpoints {}", (Object)disruptorEndpoint, (Object)this.getEndpointCount());
    }

    public synchronized int getEndpointCount() {
        return this.endpoints.size();
    }

    public String toString() {
        return "DisruptorReference{uri='" + this.uri + '\'' + ", endpoint count=" + this.endpoints.size() + ", handler count=" + this.handlers.length + '}';
    }

    private static class DelayedExecutor
    implements Executor {
        private final Queue<Runnable> delayedCommands = new LinkedList<Runnable>();

        private DelayedExecutor() {
        }

        @Override
        public void execute(Runnable command) {
            this.delayedCommands.offer(command);
        }

        public void executeDelayedCommands(Executor actualExecutor) {
            Runnable command;
            while ((command = this.delayedCommands.poll()) != null) {
                actualExecutor.execute(command);
            }
        }
    }

    private class BlockingExchangeEventHandler
    extends AbstractLifecycleAwareExchangeEventHandler {
        private final CountDownLatch blockingLatch = new CountDownLatch(1);

        private BlockingExchangeEventHandler() {
        }

        @Override
        public void onEvent(ExchangeEvent event, long sequence, boolean endOfBatch) throws Exception {
            this.blockingLatch.await();
            Exchange exchange = event.getSynchronizedExchange().cancelAndGetOriginalExchange();
            if (((Boolean)exchange.getProperty("disruptor.ignoreExchange", (Object)false, Boolean.TYPE)).booleanValue()) {
                LOGGER.trace("Ignoring exchange {}", (Object)exchange);
            } else {
                DisruptorReference.this.temporaryExchangeBuffer.offer(exchange);
            }
        }

        public void unblock() {
            this.blockingLatch.countDown();
        }
    }
}

