/*
 * Decompiled with CFR 0.152.
 */
package kafka.coordinator.transaction;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.coordinator.transaction.ProducerIdManager;
import kafka.coordinator.transaction.ProducerIdManager$;
import kafka.server.BrokerToControllerChannelManager;
import kafka.server.ControllerRequestCompletionHandler;
import kafka.utils.Logging;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AllocateProducerIdsRequest;
import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
import org.apache.kafka.server.common.ProducerIdsBlock;
import scala.Function0;
import scala.MatchError;
import scala.reflect.ScalaSignature;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005b\u0001B\u000b\u0017\u0001uA\u0001B\f\u0001\u0003\u0002\u0003\u0006Ia\f\u0005\te\u0001\u0011\t\u0011)A\u0005g!A\u0011\b\u0001B\u0001B\u0003%!\b\u0003\u0005A\u0001\t\u0005\t\u0015!\u00030\u0011\u0015\t\u0005\u0001\"\u0001C\u0011\u001dA\u0005A1A\u0005\n%Ca!\u001a\u0001!\u0002\u0013Q\u0005b\u00024\u0001\u0005\u0004%Ia\u001a\u0005\u0007]\u0002\u0001\u000b\u0011\u00025\t\u000f=\u0004\u0001\u0019!C\u0005a\"9\u0011\u000f\u0001a\u0001\n\u0013\u0011\bB\u0002=\u0001A\u0003&\u0011\fC\u0004z\u0001\u0001\u0007I\u0011\u0002>\t\u000fm\u0004\u0001\u0019!C\u0005y\"1a\u0010\u0001Q!\nYBaa \u0001\u0005B\u0005\u0005\u0001bBA\u0002\u0001\u0011%\u0011Q\u0001\u0005\t\u0003\u000f\u0001A\u0011\u0001\f\u0002\u0006!A\u0011\u0011\u0002\u0001\u0005\u0002Y\tY\u0001\u0003\u0005\u0002 \u0001!\tAFA\u0003\u0005Q\u0011\u0006k\u0011)s_\u0012,8-\u001a:JI6\u000bg.Y4fe*\u0011q\u0003G\u0001\fiJ\fgn]1di&|gN\u0003\u0002\u001a5\u0005Y1m\\8sI&t\u0017\r^8s\u0015\u0005Y\u0012!B6bM.\f7\u0001A\n\u0005\u0001y!\u0003\u0006\u0005\u0002 E5\t\u0001EC\u0001\"\u0003\u0015\u00198-\u00197b\u0013\t\u0019\u0003E\u0001\u0004B]f\u0014VM\u001a\t\u0003K\u0019j\u0011AF\u0005\u0003OY\u0011\u0011\u0003\u0015:pIV\u001cWM]%e\u001b\u0006t\u0017mZ3s!\tIC&D\u0001+\u0015\tY#$A\u0003vi&d7/\u0003\u0002.U\t9Aj\\4hS:<\u0017\u0001\u00032s_.,'/\u00133\u0011\u0005}\u0001\u0014BA\u0019!\u0005\rIe\u000e^\u0001\u0014EJ|7.\u001a:Fa>\u001c\u0007nU;qa2LWM\u001d\t\u0004?Q2\u0014BA\u001b!\u0005%1UO\\2uS>t\u0007\u0007\u0005\u0002 o%\u0011\u0001\b\t\u0002\u0005\u0019>tw-A\td_:$(o\u001c7mKJ\u001c\u0005.\u00198oK2\u0004\"a\u000f \u000e\u0003qR!!\u0010\u000e\u0002\rM,'O^3s\u0013\tyDH\u0001\u0011Ce>\\WM\u001d+p\u0007>tGO]8mY\u0016\u00148\t[1o]\u0016dW*\u00198bO\u0016\u0014\u0018!C7bq^\u000b\u0017\u000e^'t\u0003\u0019a\u0014N\\5u}Q)1\tR#G\u000fB\u0011Q\u0005\u0001\u0005\u0006]\u0015\u0001\ra\f\u0005\u0006e\u0015\u0001\ra\r\u0005\u0006s\u0015\u0001\rA\u000f\u0005\u0006\u0001\u0016\u0001\raL\u0001\u0014]\u0016DH\u000f\u0015:pIV\u001cWM]%e\u00052|7m[\u000b\u0002\u0015B\u00191J\u0015+\u000e\u00031S!!\u0014(\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0002P!\u0006!Q\u000f^5m\u0015\u0005\t\u0016\u0001\u00026bm\u0006L!a\u0015'\u0003%\u0005\u0013(/Y=CY>\u001c7.\u001b8h#V,W/\u001a\t\u0004+^KV\"\u0001,\u000b\u0005=\u0003\u0013B\u0001-W\u0005\r!&/\u001f\t\u00035\u000el\u0011a\u0017\u0006\u00039v\u000baaY8n[>t'BA\u001f_\u0015\tYrL\u0003\u0002aC\u00061\u0011\r]1dQ\u0016T\u0011AY\u0001\u0004_J<\u0017B\u00013\\\u0005A\u0001&o\u001c3vG\u0016\u0014\u0018\nZ:CY>\u001c7.\u0001\u000boKb$\bK]8ek\u000e,'/\u00133CY>\u001c7\u000eI\u0001\u0010e\u0016\fX/Z:u\u0013:4E.[4iiV\t\u0001\u000e\u0005\u0002jY6\t!N\u0003\u0002l\u0019\u00061\u0011\r^8nS\u000eL!!\u001c6\u0003\u001b\u0005#x.\\5d\u0005>|G.Z1o\u0003A\u0011X-];fgRLeN\u00127jO\"$\b%\u0001\fdkJ\u0014XM\u001c;Qe>$WoY3s\u0013\u0012\u0014En\\2l+\u0005I\u0016AG2veJ,g\u000e\u001e)s_\u0012,8-\u001a:JI\ncwnY6`I\u0015\fHCA:w!\tyB/\u0003\u0002vA\t!QK\\5u\u0011\u001d98\"!AA\u0002e\u000b1\u0001\u001f\u00132\u0003]\u0019WO\u001d:f]R\u0004&o\u001c3vG\u0016\u0014\u0018\n\u001a\"m_\u000e\\\u0007%\u0001\boKb$\bK]8ek\u000e,'/\u00133\u0016\u0003Y\n!C\\3yiB\u0013x\u000eZ;dKJLEm\u0018\u0013fcR\u00111/ \u0005\bo:\t\t\u00111\u00017\u0003=qW\r\u001f;Qe>$WoY3s\u0013\u0012\u0004\u0013AE4f]\u0016\u0014\u0018\r^3Qe>$WoY3s\u0013\u0012$\u0012AN\u0001\u0016[\u0006L(-\u001a*fcV,7\u000f\u001e(fqR\u0014En\\2l)\u0005\u0019\u0018aC:f]\u0012\u0014V-];fgR\f\u0011\u0005[1oI2,\u0017\t\u001c7pG\u0006$X\r\u0015:pIV\u001cWM]%egJ+7\u000f]8og\u0016$2a]A\u0007\u0011\u001d\tya\u0005a\u0001\u0003#\t\u0001B]3ta>t7/\u001a\t\u0005\u0003'\tY\"\u0004\u0002\u0002\u0016)!\u0011qCA\r\u0003!\u0011X-];fgR\u001c(B\u0001/_\u0013\u0011\ti\"!\u0006\u00037\u0005cGn\\2bi\u0016\u0004&o\u001c3vG\u0016\u0014\u0018\nZ:SKN\u0004xN\\:f\u00035A\u0017M\u001c3mKRKW.Z8vi\u0002")
public class RPCProducerIdManager
implements ProducerIdManager,
Logging {
    private final int brokerId;
    private final Function0<Object> brokerEpochSupplier;
    private final BrokerToControllerChannelManager controllerChannel;
    private final int maxWaitMs;
    private final ArrayBlockingQueue<Try<ProducerIdsBlock>> nextProducerIdBlock;
    private final AtomicBoolean requestInFlight;
    private ProducerIdsBlock currentProducerIdBlock;
    private long nextProducerId;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    @Override
    public void shutdown() {
        ProducerIdManager.shutdown$(this);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    private ArrayBlockingQueue<Try<ProducerIdsBlock>> nextProducerIdBlock() {
        return this.nextProducerIdBlock;
    }

    private AtomicBoolean requestInFlight() {
        return this.requestInFlight;
    }

    private ProducerIdsBlock currentProducerIdBlock() {
        return this.currentProducerIdBlock;
    }

    private void currentProducerIdBlock_$eq(ProducerIdsBlock x$1) {
        this.currentProducerIdBlock = x$1;
    }

    private long nextProducerId() {
        return this.nextProducerId;
    }

    private void nextProducerId_$eq(long x$1) {
        this.nextProducerId = x$1;
    }

    @Override
    public synchronized long generateProducerId() {
        if (this.nextProducerId() == -1L) {
            this.maybeRequestNextBlock();
            this.nextProducerId_$eq(0L);
        } else {
            this.nextProducerId_$eq(this.nextProducerId() + 1L);
            if ((double)this.nextProducerId() >= (double)this.currentProducerIdBlock().firstProducerId() + (double)this.currentProducerIdBlock().size() * ProducerIdManager$.MODULE$.PidPrefetchThreshold()) {
                this.maybeRequestNextBlock();
            }
        }
        if (this.nextProducerId() > this.currentProducerIdBlock().lastProducerId()) {
            Try<ProducerIdsBlock> block = this.nextProducerIdBlock().poll(this.maxWaitMs, TimeUnit.MILLISECONDS);
            if (block == null) {
                throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block");
            }
            if (block instanceof Success) {
                ProducerIdsBlock nextBlock = (ProducerIdsBlock)((Success)block).value();
                this.currentProducerIdBlock_$eq(nextBlock);
                this.nextProducerId_$eq(this.currentProducerIdBlock().firstProducerId());
            } else {
                if (block instanceof Failure) {
                    throw ((Failure)block).exception();
                }
                throw new MatchError(block);
            }
        }
        return this.nextProducerId();
    }

    private void maybeRequestNextBlock() {
        if (this.nextProducerIdBlock().isEmpty() && this.requestInFlight().compareAndSet(false, true)) {
            this.sendRequest();
        }
    }

    public void sendRequest() {
        AllocateProducerIdsRequestData message = new AllocateProducerIdsRequestData().setBrokerEpoch(this.brokerEpochSupplier.apply$mcJ$sp()).setBrokerId(this.brokerId);
        AllocateProducerIdsRequest.Builder request = new AllocateProducerIdsRequest.Builder(message);
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Requesting next Producer ID block");
        this.controllerChannel.sendRequest((AbstractRequest.Builder<? extends AbstractRequest>)request, new ControllerRequestCompletionHandler(this){
            private final /* synthetic */ RPCProducerIdManager $outer;

            public void onComplete(ClientResponse response) {
                AllocateProducerIdsResponse message = (AllocateProducerIdsResponse)response.responseBody();
                this.$outer.handleAllocateProducerIdsResponse(message);
            }

            public void onTimeout() {
                this.$outer.handleTimeout();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    public void handleAllocateProducerIdsResponse(AllocateProducerIdsResponse response) {
        this.requestInFlight().set(false);
        AllocateProducerIdsResponseData data = response.data();
        Errors errors = Errors.forCode((short)data.errorCode());
        if (Errors.NONE.equals(errors)) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(43).append("Got next producer ID block from controller ").append(data).toString());
            if (data.producerIdStart() < this.currentProducerIdBlock().lastProducerId()) {
                this.nextProducerIdBlock().put((Try<ProducerIdsBlock>)new Failure((Throwable)new KafkaException(new StringBuilder(73).append("Producer ID block is not monotonic with current block: current=").append(this.currentProducerIdBlock()).append(" response=").append(data).toString())));
                return;
            }
            if (data.producerIdStart() < 0L || data.producerIdLen() < 0 || data.producerIdStart() > Long.MAX_VALUE - (long)data.producerIdLen()) {
                this.nextProducerIdBlock().put((Try<ProducerIdsBlock>)new Failure((Throwable)new KafkaException(new StringBuilder(45).append("Producer ID block includes invalid ID range: ").append(data).toString())));
                return;
            }
            this.nextProducerIdBlock().put((Try<ProducerIdsBlock>)new Success((Object)new ProducerIdsBlock(this.brokerId, data.producerIdStart(), data.producerIdLen())));
            return;
        }
        if (Errors.STALE_BROKER_EPOCH.equals(errors)) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Our broker epoch was stale, trying again.");
            this.maybeRequestNextBlock();
            return;
        }
        if (Errors.BROKER_ID_NOT_REGISTERED.equals(errors)) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Our broker ID is not yet known by the controller, trying again.");
            this.maybeRequestNextBlock();
            return;
        }
        if (errors != null) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Had an unknown error from the controller, giving up.");
            this.nextProducerIdBlock().put((Try<ProducerIdsBlock>)new Failure((Throwable)errors.exception()));
            return;
        }
        throw new MatchError(null);
    }

    public void handleTimeout() {
        this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Timed out when requesting AllocateProducerIds from the controller.");
        this.requestInFlight().set(false);
        this.nextProducerIdBlock().put((Try<ProducerIdsBlock>)new Failure((Throwable)Errors.REQUEST_TIMED_OUT.exception()));
        this.maybeRequestNextBlock();
    }

    public RPCProducerIdManager(int brokerId, Function0<Object> brokerEpochSupplier, BrokerToControllerChannelManager controllerChannel, int maxWaitMs) {
        this.brokerId = brokerId;
        this.brokerEpochSupplier = brokerEpochSupplier;
        this.controllerChannel = controllerChannel;
        this.maxWaitMs = maxWaitMs;
        ProducerIdManager.$init$(this);
        Logging.$init$(this);
        this.logIdent_$eq(new StringBuilder(27).append("[RPC ProducerId Manager ").append(brokerId).append("]: ").toString());
        this.nextProducerIdBlock = new ArrayBlockingQueue(1);
        this.requestInFlight = new AtomicBoolean(false);
        this.currentProducerIdBlock = ProducerIdsBlock.EMPTY;
        this.nextProducerId = -1L;
    }
}

