package akka.remote.artery;

import akka.event.LoggingAdapter;
import akka.remote.UniqueAddress;
import akka.remote.artery.SystemMessageDelivery;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import akka.util.OptionVal;
import akka.util.OptionVal$;
import akka.util.OptionVal$Some$;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.immutable.Map;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SystemMessageDelivery.scala */
/* loaded from: input_file:akka/remote/artery/SystemMessageAcker$$anon$2.class */
public final class SystemMessageAcker$$anon$2 extends GraphStageLogic implements InHandler, OutHandler, StageLogging {
    private Map<UniqueAddress, Object> sequenceNumbers;
    private int nackCount;
    private final /* synthetic */ SystemMessageAcker $outer;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;

    @Override // akka.stream.stage.StageLogging
    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    @Override // akka.stream.stage.StageLogging
    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    @Override // akka.stream.stage.StageLogging
    public LoggingAdapter log() {
        return StageLogging.Cclass.log(this);
    }

    @Override // akka.stream.stage.OutHandler
    public void onDownstreamFinish() throws Exception {
        OutHandler.Cclass.onDownstreamFinish(this);
    }

    @Override // akka.stream.stage.InHandler
    public void onUpstreamFinish() throws Exception {
        InHandler.Cclass.onUpstreamFinish(this);
    }

    @Override // akka.stream.stage.InHandler
    public void onUpstreamFailure(Throwable th) throws Exception {
        InHandler.Cclass.onUpstreamFailure(this, th);
    }

    private Map<UniqueAddress, Object> sequenceNumbers() {
        return this.sequenceNumbers;
    }

    private void sequenceNumbers_$eq(Map<UniqueAddress, Object> map) {
        this.sequenceNumbers = map;
    }

    private int nackCount() {
        return this.nackCount;
    }

    private void nackCount_$eq(int i) {
        this.nackCount = i;
    }

    private UniqueAddress localAddress() {
        return this.$outer.akka$remote$artery$SystemMessageAcker$$inboundContext.localAddress();
    }

    @Override // akka.stream.stage.StageLogging
    public Class<?> logSource() {
        return SystemMessageAcker.class;
    }

    @Override // akka.stream.stage.InHandler
    public void onPush() {
        long unboxToLong;
        BoxedUnit boxedUnit;
        InboundEnvelope inboundEnvelope = (InboundEnvelope) grab(this.$outer.in());
        Object message = inboundEnvelope.message();
        if (!(message instanceof SystemMessageDelivery.SystemMessageEnvelope)) {
            push(this.$outer.out(), inboundEnvelope);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        SystemMessageDelivery.SystemMessageEnvelope systemMessageEnvelope = (SystemMessageDelivery.SystemMessageEnvelope) message;
        long seqNo = systemMessageEnvelope.seqNo();
        UniqueAddress ackReplyTo = systemMessageEnvelope.ackReplyTo();
        Option<Object> option = sequenceNumbers().get(ackReplyTo);
        if (None$.MODULE$.equals(option)) {
            unboxToLong = 1;
        } else {
            if (!(option instanceof Some)) {
                throw new MatchError(option);
            }
            unboxToLong = BoxesRunTime.unboxToLong(((Some) option).x());
        }
        long j = unboxToLong;
        if (seqNo == j) {
            this.$outer.akka$remote$artery$SystemMessageAcker$$inboundContext.sendControl(ackReplyTo.address(), new SystemMessageDelivery.Ack(seqNo, localAddress()));
            sequenceNumbers_$eq(sequenceNumbers().updated((Map<UniqueAddress, Object>) ackReplyTo, (UniqueAddress) BoxesRunTime.boxToLong(seqNo + 1)));
            push(this.$outer.out(), inboundEnvelope.withMessage(systemMessageEnvelope.message()));
            boxedUnit = BoxedUnit.UNIT;
        } else if (seqNo < j) {
            if (log().isDebugEnabled()) {
                log().debug("Deduplicate system message [{}] from [{}], expected [{}]", BoxesRunTime.boxToLong(seqNo), fromRemoteAddressStr$1(inboundEnvelope), BoxesRunTime.boxToLong(j));
            }
            this.$outer.akka$remote$artery$SystemMessageAcker$$inboundContext.sendControl(ackReplyTo.address(), new SystemMessageDelivery.Ack(j - 1, localAddress()));
            pull(this.$outer.in());
            boxedUnit = BoxedUnit.UNIT;
        } else {
            if (nackCount() < SystemMessageAcker$.MODULE$.MaxNegativeAcknowledgementLogging()) {
                nackCount_$eq(nackCount() + 1);
                log().warning("Sending negative acknowledgement of system message [{}] from [{}], highest acknowledged [{}]{}", BoxesRunTime.boxToLong(seqNo), fromRemoteAddressStr$1(inboundEnvelope), BoxesRunTime.boxToLong(j - 1), nackCount() == SystemMessageAcker$.MODULE$.MaxNegativeAcknowledgementLogging() ? new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{". This happened [", "] times and will not be logged more."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(SystemMessageAcker$.MODULE$.MaxNegativeAcknowledgementLogging())})) : "");
            }
            this.$outer.akka$remote$artery$SystemMessageAcker$$inboundContext.sendControl(ackReplyTo.address(), new SystemMessageDelivery.Nack(j - 1, localAddress()));
            pull(this.$outer.in());
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    @Override // akka.stream.stage.OutHandler
    public void onPull() {
        pull(this.$outer.in());
    }

    private final String fromRemoteAddressStr$1(InboundEnvelope inboundEnvelope) {
        String address;
        OutboundContext association = inboundEnvelope.association();
        OutboundContext outboundContext = (OutboundContext) OptionVal$Some$.MODULE$.unapply(association);
        if (OptionVal$.MODULE$.isEmpty$extension(outboundContext)) {
            OptionVal$.MODULE$.None();
            Object obj = null;
            if (0 != 0 ? !obj.equals(association) : association != null) {
                throw new MatchError(new OptionVal(association));
            }
            address = YarnConfiguration.DEFAULT_APPLICATION_NAME;
        } else {
            address = ((OutboundContext) OptionVal$.MODULE$.get$extension(outboundContext)).remoteAddress().toString();
        }
        return address;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SystemMessageAcker$$anon$2(SystemMessageAcker systemMessageAcker) {
        super(systemMessageAcker.shape2());
        if (systemMessageAcker == null) {
            throw null;
        }
        this.$outer = systemMessageAcker;
        InHandler.Cclass.$init$(this);
        OutHandler.Cclass.$init$(this);
        StageLogging.Cclass.$init$(this);
        this.sequenceNumbers = Predef$.MODULE$.Map().empty2();
        this.nackCount = 0;
        setHandlers(systemMessageAcker.in(), systemMessageAcker.out(), this);
    }
}
