package org.apache.flume.channel;

import com.google.common.base.Preconditions;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.annotations.Recyclable;
import org.apache.flume.instrumentation.ChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Stable
@Recyclable
/* loaded from: input_file:org/apache/flume/channel/MemoryChannel.class */
public class MemoryChannel extends BasicChannelSemantics {
    private static final double byteCapacitySlotSize = 100.0d;
    private Object queueLock = new Object();

    @GuardedBy("queueLock")
    private LinkedBlockingDeque<Event> queue;
    private Semaphore queueRemaining;
    private Semaphore queueStored;
    private volatile Integer transCapacity;
    private volatile int keepAlive;
    private volatile int byteCapacity;
    private volatile int lastByteCapacity;
    private volatile int byteCapacityBufferPercentage;
    private Semaphore bytesRemaining;
    private ChannelCounter channelCounter;
    private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
    private static final Integer defaultCapacity = 100;
    private static final Integer defaultTransCapacity = 100;
    private static final Long defaultByteCapacity = Long.valueOf((long) (Runtime.getRuntime().maxMemory() * 0.8d));
    private static final Integer defaultByteCapacityBufferPercentage = 20;
    private static final Integer defaultKeepAlive = 3;

    /* loaded from: input_file:org/apache/flume/channel/MemoryChannel$MemoryTransaction.class */
    private class MemoryTransaction extends BasicTransactionSemantics {
        private LinkedBlockingDeque<Event> takeList;
        private LinkedBlockingDeque<Event> putList;
        private final ChannelCounter channelCounter;
        private int putByteCounter = 0;
        private int takeByteCounter = 0;

        public MemoryTransaction(int i, ChannelCounter channelCounter) {
            this.putList = new LinkedBlockingDeque<>(i);
            this.takeList = new LinkedBlockingDeque<>(i);
            this.channelCounter = channelCounter;
        }

        @Override // org.apache.flume.channel.BasicTransactionSemantics
        protected void doPut(Event event) throws InterruptedException {
            this.channelCounter.incrementEventPutAttemptCount();
            int ceil = (int) Math.ceil(MemoryChannel.this.estimateEventSize(event) / MemoryChannel.byteCapacitySlotSize);
            if (!this.putList.offer(event)) {
                throw new ChannelException("Put queue for MemoryTransaction of capacity " + this.putList.size() + " full, consider committing more frequently, increasing capacity or increasing thread count");
            }
            this.putByteCounter += ceil;
        }

        @Override // org.apache.flume.channel.BasicTransactionSemantics
        protected Event doTake() throws InterruptedException {
            Event event;
            this.channelCounter.incrementEventTakeAttemptCount();
            if (this.takeList.remainingCapacity() == 0) {
                throw new ChannelException("Take list for MemoryTransaction, capacity " + this.takeList.size() + " full, consider committing more frequently, increasing capacity, or increasing thread count");
            }
            if (!MemoryChannel.this.queueStored.tryAcquire(MemoryChannel.this.keepAlive, TimeUnit.SECONDS)) {
                return null;
            }
            synchronized (MemoryChannel.this.queueLock) {
                event = (Event) MemoryChannel.this.queue.poll();
            }
            Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore signalling existence of entry");
            this.takeList.put(event);
            this.takeByteCounter += (int) Math.ceil(MemoryChannel.this.estimateEventSize(event) / MemoryChannel.byteCapacitySlotSize);
            return event;
        }

        /* JADX WARN: Code restructure failed: missing block: B:15:0x00ae, code lost:
        
            if (r0 > 0) goto L41;
         */
        /* JADX WARN: Code restructure failed: missing block: B:16:0x00d9, code lost:
        
            r8.putList.clear();
            r8.takeList.clear();
         */
        /* JADX WARN: Code restructure failed: missing block: B:32:0x00b8, code lost:
        
            if (r8.putList.isEmpty() != false) goto L42;
         */
        /* JADX WARN: Code restructure failed: missing block: B:34:0x00cc, code lost:
        
            if (r8.this$0.queue.offer(r8.putList.removeFirst()) != false) goto L44;
         */
        /* JADX WARN: Code restructure failed: missing block: B:37:0x00d8, code lost:
        
            throw new java.lang.RuntimeException("Queue add failed, this shouldn't be able to happen");
         */
        @Override // org.apache.flume.channel.BasicTransactionSemantics
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        protected void doCommit() throws java.lang.InterruptedException {
            /*
                Method dump skipped, instructions count: 342
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flume.channel.MemoryChannel.MemoryTransaction.doCommit():void");
        }

        @Override // org.apache.flume.channel.BasicTransactionSemantics
        protected void doRollback() {
            int size = this.takeList.size();
            synchronized (MemoryChannel.this.queueLock) {
                Preconditions.checkState(MemoryChannel.this.queue.remainingCapacity() >= this.takeList.size(), "Not enough space in memory channel queue to rollback takes. This should never happen, please report");
                while (!this.takeList.isEmpty()) {
                    MemoryChannel.this.queue.addFirst(this.takeList.removeLast());
                }
                this.putList.clear();
            }
            MemoryChannel.this.bytesRemaining.release(this.putByteCounter);
            this.putByteCounter = 0;
            this.takeByteCounter = 0;
            MemoryChannel.this.queueStored.release(size);
            this.channelCounter.setChannelSize(MemoryChannel.this.queue.size());
        }
    }

    @Override // org.apache.flume.channel.AbstractChannel, org.apache.flume.conf.Configurable
    public void configure(Context context) {
        Integer num;
        try {
            num = context.getInteger("capacity", defaultCapacity);
        } catch (NumberFormatException e) {
            num = defaultCapacity;
            LOGGER.warn("Invalid capacity specified, initializing channel to default capacity of {}", defaultCapacity);
        }
        if (num.intValue() <= 0) {
            num = defaultCapacity;
            LOGGER.warn("Invalid capacity specified, initializing channel to default capacity of {}", defaultCapacity);
        }
        try {
            this.transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
        } catch (NumberFormatException e2) {
            this.transCapacity = defaultTransCapacity;
            LOGGER.warn("Invalid transation capacity specified, initializing channel to default capacity of {}", defaultTransCapacity);
        }
        if (this.transCapacity.intValue() <= 0) {
            this.transCapacity = defaultTransCapacity;
            LOGGER.warn("Invalid transation capacity specified, initializing channel to default capacity of {}", defaultTransCapacity);
        }
        Preconditions.checkState(this.transCapacity.intValue() <= num.intValue(), "Transaction Capacity of Memory Channel cannot be higher than the capacity.");
        try {
            this.byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage).intValue();
        } catch (NumberFormatException e3) {
            this.byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage.intValue();
        }
        try {
            this.byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1.0d - (this.byteCapacityBufferPercentage * 0.01d))) / byteCapacitySlotSize);
            if (this.byteCapacity < 1) {
                this.byteCapacity = Integer.MAX_VALUE;
            }
        } catch (NumberFormatException e4) {
            this.byteCapacity = (int) ((defaultByteCapacity.longValue() * (1.0d - (this.byteCapacityBufferPercentage * 0.01d))) / byteCapacitySlotSize);
        }
        try {
            this.keepAlive = context.getInteger("keep-alive", defaultKeepAlive).intValue();
        } catch (NumberFormatException e5) {
            this.keepAlive = defaultKeepAlive.intValue();
        }
        if (this.queue != null) {
            try {
                resizeQueue(num.intValue());
            } catch (InterruptedException e6) {
                Thread.currentThread().interrupt();
            }
        } else {
            synchronized (this.queueLock) {
                this.queue = new LinkedBlockingDeque<>(num.intValue());
                this.queueRemaining = new Semaphore(num.intValue());
                this.queueStored = new Semaphore(0);
            }
        }
        if (this.bytesRemaining == null) {
            this.bytesRemaining = new Semaphore(this.byteCapacity);
            this.lastByteCapacity = this.byteCapacity;
        } else if (this.byteCapacity > this.lastByteCapacity) {
            this.bytesRemaining.release(this.byteCapacity - this.lastByteCapacity);
            this.lastByteCapacity = this.byteCapacity;
        } else {
            try {
                if (this.bytesRemaining.tryAcquire(this.lastByteCapacity - this.byteCapacity, this.keepAlive, TimeUnit.SECONDS)) {
                    this.lastByteCapacity = this.byteCapacity;
                } else {
                    LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, resizing has been aborted");
                }
            } catch (InterruptedException e7) {
                Thread.currentThread().interrupt();
            }
        }
        if (this.channelCounter == null) {
            this.channelCounter = new ChannelCounter(getName());
        }
    }

    private void resizeQueue(int i) throws InterruptedException {
        int size;
        synchronized (this.queueLock) {
            size = this.queue.size() + this.queue.remainingCapacity();
        }
        if (size == i) {
            return;
        }
        if (size <= i) {
            synchronized (this.queueLock) {
                LinkedBlockingDeque<Event> linkedBlockingDeque = new LinkedBlockingDeque<>(i);
                linkedBlockingDeque.addAll(this.queue);
                this.queue = linkedBlockingDeque;
            }
            this.queueRemaining.release(i - size);
            return;
        }
        if (!this.queueRemaining.tryAcquire(size - i, this.keepAlive, TimeUnit.SECONDS)) {
            LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
            return;
        }
        synchronized (this.queueLock) {
            LinkedBlockingDeque<Event> linkedBlockingDeque2 = new LinkedBlockingDeque<>(i);
            linkedBlockingDeque2.addAll(this.queue);
            this.queue = linkedBlockingDeque2;
        }
    }

    @Override // org.apache.flume.channel.AbstractChannel, org.apache.flume.lifecycle.LifecycleAware
    public synchronized void start() {
        this.channelCounter.start();
        this.channelCounter.setChannelSize(this.queue.size());
        this.channelCounter.setChannelCapacity(Long.valueOf(this.queue.size() + this.queue.remainingCapacity()).longValue());
        super.start();
    }

    @Override // org.apache.flume.channel.AbstractChannel, org.apache.flume.lifecycle.LifecycleAware
    public synchronized void stop() {
        this.channelCounter.setChannelSize(this.queue.size());
        this.channelCounter.stop();
        super.stop();
    }

    @Override // org.apache.flume.channel.BasicChannelSemantics
    protected BasicTransactionSemantics createTransaction() {
        return new MemoryTransaction(this.transCapacity.intValue(), this.channelCounter);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long estimateEventSize(Event event) {
        byte[] body = event.getBody();
        if (body == null || body.length == 0) {
            return 1L;
        }
        return body.length;
    }
}
