package org.apache.qpid.jms.provider.amqp;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderException;
import org.apache.qpid.jms.provider.amqp.message.AmqpReadableBuffer;
import org.apache.qpid.jms.provider.exceptions.ProviderDeliveryModifiedException;
import org.apache.qpid.jms.provider.exceptions.ProviderDeliveryReleasedException;
import org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport;
import org.apache.qpid.jms.provider.exceptions.ProviderIllegalStateException;
import org.apache.qpid.jms.provider.exceptions.ProviderSendTimedOutException;
import org.apache.qpid.jms.provider.exceptions.ProviderUnsupportedOperationException;
import org.apache.qpid.jms.tracing.JmsTracer;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.class */
public class AmqpFixedProducer extends AmqpProducer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpFixedProducer.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private final AmqpTransferTagGenerator tagGenerator;
    private final Map<Object, InFlightSend> sent;
    private final Map<Object, InFlightSend> blocked;
    private final AmqpConnection connection;
    private final JmsTracer tracer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/jms/provider/amqp/AmqpFixedProducer$InFlightSend.class */
    public final class InFlightSend implements AsyncResult, AmqpExceptionBuilder {
        private final JmsOutboundMessageDispatch envelope;
        private final AsyncResult request;
        private Delivery delivery;
        private ScheduledFuture<?> requestTimeout;

        public InFlightSend(JmsOutboundMessageDispatch jmsOutboundMessageDispatch, AsyncResult asyncResult) {
            this.envelope = jmsOutboundMessageDispatch;
            this.request = asyncResult;
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public void onFailure(ProviderException providerException) {
            handleSendCompletion(false);
            if (!this.request.isComplete()) {
                this.request.onFailure(providerException);
            } else if (this.envelope.isCompletionRequired()) {
                AmqpFixedProducer.this.getParent().getProvider().getProviderListener().onFailedMessageSend(this.envelope, ProviderExceptionSupport.createNonFatalOrPassthrough(providerException));
            } else {
                AmqpFixedProducer.this.getParent().getProvider().fireNonFatalProviderException(ProviderExceptionSupport.createNonFatalOrPassthrough(providerException));
            }
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public void onSuccess() {
            handleSendCompletion(true);
            if (!this.request.isComplete()) {
                this.request.onSuccess();
            }
            if (this.envelope.isCompletionRequired()) {
                AmqpFixedProducer.this.getParent().getProvider().getProviderListener().onCompletedMessageSend(this.envelope);
            }
        }

        public void setRequestTimeout(ScheduledFuture<?> scheduledFuture) {
            if (this.requestTimeout != null) {
                this.requestTimeout.cancel(false);
            }
            this.requestTimeout = scheduledFuture;
        }

        public JmsOutboundMessageDispatch getEnvelope() {
            return this.envelope;
        }

        public AsyncResult getOriginalRequest() {
            return this.request;
        }

        public void setDelivery(Delivery delivery) {
            this.delivery = delivery;
        }

        public Delivery getDelivery() {
            return this.delivery;
        }

        @Override // org.apache.qpid.jms.provider.AsyncResult
        public boolean isComplete() {
            return this.request.isComplete();
        }

        private void handleSendCompletion(boolean z) {
            setRequestTimeout(null);
            if (getDelivery() != null) {
                AmqpFixedProducer.this.sent.remove(this.envelope.getMessageId());
                this.delivery.settle();
                if (z) {
                    AmqpFixedProducer.this.tagGenerator.returnTag(this.delivery.getTag());
                }
                DeliveryState remoteState = this.delivery.getRemoteState();
                AmqpFixedProducer.this.tracer.completeSend(this.envelope.getMessage().getFacade(), remoteState == null ? null : remoteState.getType().name());
            } else {
                AmqpFixedProducer.this.blocked.remove(this.envelope.getMessageId());
                AmqpFixedProducer.this.tracer.completeSend(this.envelope.getMessage().getFacade(), null);
            }
            this.envelope.getMessage().onSendComplete();
            if (AmqpFixedProducer.this.isAwaitingClose() && !AmqpFixedProducer.this.isClosed() && AmqpFixedProducer.this.blocked.isEmpty() && AmqpFixedProducer.this.sent.isEmpty()) {
                AmqpFixedProducer.super.close(AmqpFixedProducer.this.closeRequest);
            }
        }

        @Override // org.apache.qpid.jms.provider.amqp.AmqpExceptionBuilder
        public ProviderException createException() {
            return this.delivery == null ? new ProviderSendTimedOutException("Timed out waiting for credit to send Message", this.envelope.getMessage()) : new ProviderSendTimedOutException("Timed out waiting for disposition of sent Message", this.envelope.getMessage());
        }
    }

    public AmqpFixedProducer(AmqpSession amqpSession, JmsProducerInfo jmsProducerInfo, Sender sender) {
        super(amqpSession, jmsProducerInfo, sender);
        this.tagGenerator = new AmqpTransferTagGenerator(true);
        this.sent = new LinkedHashMap();
        this.blocked = new LinkedHashMap();
        this.connection = amqpSession.getConnection();
        this.tracer = this.connection.getResourceInfo().getTracer();
        this.delayedDeliverySupported = this.connection.getProperties().isDelayedDeliverySupported();
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    public void close(AsyncResult asyncResult) {
        if (this.blocked.isEmpty() && this.sent.isEmpty()) {
            super.close(asyncResult);
        } else {
            this.closeRequest = asyncResult;
        }
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpProducer
    public void send(JmsOutboundMessageDispatch jmsOutboundMessageDispatch, AsyncResult asyncResult) throws ProviderException {
        if (isClosed()) {
            asyncResult.onFailure(new ProviderIllegalStateException("The MessageProducer is closed"));
        }
        InFlightSend inFlightSend = new InFlightSend(jmsOutboundMessageDispatch, asyncResult);
        if (!this.delayedDeliverySupported && jmsOutboundMessageDispatch.getMessage().getFacade().isDeliveryTimeTransmitted()) {
            inFlightSend.onFailure(new ProviderUnsupportedOperationException("Remote does not support delayed message delivery"));
            return;
        }
        if (this.session.isTransactionInDoubt()) {
            inFlightSend.onSuccess();
            return;
        }
        if (getEndpoint().getCredit() > 0) {
            doSend(jmsOutboundMessageDispatch, inFlightSend);
            return;
        }
        LOG.trace("Holding Message send until credit is available.");
        if (getSendTimeout() > -1) {
            inFlightSend.requestTimeout = getParent().getProvider().scheduleRequestTimeout(inFlightSend, getSendTimeout(), inFlightSend);
        }
        this.blocked.put(jmsOutboundMessageDispatch.getMessageId(), inFlightSend);
        getParent().getProvider().pumpToProtonTransport(asyncResult);
    }

    private void doSend(JmsOutboundMessageDispatch jmsOutboundMessageDispatch, InFlightSend inFlightSend) throws ProviderException {
        Delivery delivery;
        LOG.trace("Producer sending message: {}", jmsOutboundMessageDispatch);
        boolean z = jmsOutboundMessageDispatch.isPresettle() || isPresettle();
        if (z) {
            delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
        } else {
            byte[] nextTag = this.tagGenerator.getNextTag();
            delivery = getEndpoint().delivery(nextTag, 0, nextTag.length);
        }
        if (this.session.isTransacted()) {
            AmqpTransactionContext transactionContext = this.session.getTransactionContext();
            delivery.disposition(transactionContext.getTxnEnrolledState());
            transactionContext.registerTxProducer(this);
        }
        getEndpoint().sendNoCopy(new AmqpReadableBuffer(((ByteBuf) jmsOutboundMessageDispatch.getPayload()).duplicate()));
        AmqpProvider provider = getParent().getProvider();
        if (!z && getSendTimeout() != -1 && inFlightSend.requestTimeout == null) {
            inFlightSend.requestTimeout = getParent().getProvider().scheduleRequestTimeout(inFlightSend, getSendTimeout(), inFlightSend);
        }
        if (z) {
            delivery.settle();
        } else {
            this.sent.put(jmsOutboundMessageDispatch.getMessageId(), inFlightSend);
            getEndpoint().advance();
        }
        inFlightSend.setDelivery(delivery);
        delivery.setContext(inFlightSend);
        if (provider.pumpToProtonTransport(inFlightSend, false)) {
            if (z) {
                inFlightSend.onSuccess();
            } else if (jmsOutboundMessageDispatch.isSendAsync()) {
                inFlightSend.getOriginalRequest().onSuccess();
            }
            try {
                provider.getTransport().flush();
            } catch (Throwable th) {
                throw ProviderExceptionSupport.createOrPassthroughFatal(th);
            }
        }
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpEventSink
    public void processFlowUpdates(AmqpProvider amqpProvider) throws ProviderException {
        if (!this.blocked.isEmpty() && getEndpoint().getCredit() > 0) {
            Iterator<InFlightSend> it = this.blocked.values().iterator();
            while (getEndpoint().getCredit() > 0 && it.hasNext()) {
                LOG.trace("Dispatching previously held send");
                InFlightSend next = it.next();
                try {
                    if (this.session.isTransacted() && this.session.isTransactionInDoubt()) {
                        next.onSuccess();
                        it.remove();
                        return;
                    } else {
                        doSend(next.getEnvelope(), next);
                        it.remove();
                    }
                } catch (Throwable th) {
                    it.remove();
                    throw th;
                }
            }
        }
        if (getEndpoint().getDrain()) {
            getEndpoint().drained();
        }
        super.processFlowUpdates(amqpProvider);
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource, org.apache.qpid.jms.provider.amqp.AmqpEventSink
    public void processDeliveryUpdates(AmqpProvider amqpProvider, Delivery delivery) throws ProviderException {
        DeliveryState remoteState = delivery.getRemoteState();
        if (remoteState != null) {
            InFlightSend inFlightSend = (InFlightSend) delivery.getContext();
            if (remoteState.getType() == DeliveryState.DeliveryStateType.Accepted) {
                LOG.trace("Outcome of delivery was accepted: {}", delivery);
                inFlightSend.onSuccess();
            } else {
                applyDeliveryStateUpdate(inFlightSend, delivery, remoteState);
            }
        }
        super.processDeliveryUpdates(amqpProvider, delivery);
    }

    private void applyDeliveryStateUpdate(InFlightSend inFlightSend, Delivery delivery, DeliveryState deliveryState) {
        ProviderException providerException = null;
        if (deliveryState == null) {
            return;
        }
        switch (deliveryState.getType()) {
            case Transactional:
                LOG.trace("State of delivery is Transactional, retrieving outcome: {}", deliveryState);
                applyDeliveryStateUpdate(inFlightSend, delivery, (DeliveryState) ((TransactionalState) deliveryState).getOutcome());
                break;
            case Accepted:
                LOG.trace("Outcome of delivery was accepted: {}", delivery);
                inFlightSend.onSuccess();
                break;
            case Rejected:
                LOG.trace("Outcome of delivery was rejected: {}", delivery);
                ErrorCondition error = ((Rejected) deliveryState).getError();
                if (error == null) {
                    error = getEndpoint().getRemoteCondition();
                }
                providerException = AmqpSupport.convertToNonFatalException(getParent().getProvider(), getEndpoint(), error);
                break;
            case Released:
                LOG.trace("Outcome of delivery was released: {}", delivery);
                providerException = new ProviderDeliveryReleasedException("Delivery failed: released by receiver");
                break;
            case Modified:
                LOG.trace("Outcome of delivery was modified: {}", delivery);
                providerException = new ProviderDeliveryModifiedException("Delivery failed: failure at remote", (Modified) deliveryState);
                break;
            default:
                LOG.warn("Message send updated with unsupported state: {}", deliveryState);
                break;
        }
        if (providerException != null) {
            inFlightSend.onFailure(providerException);
        }
    }

    public AmqpSession getSession() {
        return this.session;
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpProducer
    public boolean isAnonymous() {
        return getResourceInfo().getDestination() == null;
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpProducer
    public boolean isPresettle() {
        return getEndpoint().getSenderSettleMode() == SenderSettleMode.SETTLED;
    }

    public long getSendTimeout() {
        return getParent().getProvider().getSendTimeout();
    }

    public String toString() {
        return "AmqpFixedProducer { " + getProducerId() + " }";
    }

    @Override // org.apache.qpid.jms.provider.amqp.AmqpAbstractResource
    public void handleResourceClosure(AmqpProvider amqpProvider, ProviderException providerException) {
        if (providerException == null) {
            providerException = getEndpoint().getRemoteCondition() != null ? AmqpSupport.convertToNonFatalException(amqpProvider, getEndpoint(), getEndpoint().getRemoteCondition()) : new ProviderException("Producer closed remotely before message transfer result was notified");
        }
        for (InFlightSend inFlightSend : new ArrayList(this.sent.values())) {
            try {
                inFlightSend.onFailure(providerException);
            } catch (Exception e) {
                LOG.debug("Caught exception when failing pending send during remote producer closure: {}", inFlightSend, e);
            }
        }
        for (InFlightSend inFlightSend2 : new ArrayList(this.blocked.values())) {
            try {
                inFlightSend2.onFailure(providerException);
            } catch (Exception e2) {
                LOG.debug("Caught exception when failing blocked send during remote producer closure: {}", inFlightSend2, e2);
            }
        }
    }
}
