package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.LockContainer;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusSessionReceiver.class */
public class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSessionReceiver.class);
    private final ServiceBusReceiveLink receiveLink;
    private final Disposable.Composite subscriptions;
    private final Flux<ServiceBusMessageContext> receivedMessages;
    private final AmqpRetryOptions retryOptions;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicReference<OffsetDateTime> sessionLockedUntil = new AtomicReference<>();
    private final AtomicReference<String> sessionId = new AtomicReference<>();
    private final AtomicReference<LockRenewalOperation> renewalOperation = new AtomicReference<>();
    private final MonoProcessor<ServiceBusMessageContext> cancelReceiveProcessor = MonoProcessor.create();
    private final DirectProcessor<String> messageReceivedEmitter = DirectProcessor.create();
    private final FluxSink<String> messageReceivedSink = this.messageReceivedEmitter.sink(FluxSink.OverflowStrategy.BUFFER);
    private final LockContainer<OffsetDateTime> lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusSessionReceiver(ServiceBusReceiveLink serviceBusReceiveLink, MessageSerializer messageSerializer, AmqpRetryOptions amqpRetryOptions, int i, boolean z, Scheduler scheduler, Function<String, Mono<OffsetDateTime>> function, Duration duration) {
        this.receiveLink = serviceBusReceiveLink;
        this.retryOptions = amqpRetryOptions;
        serviceBusReceiveLink.setEmptyCreditListener(() -> {
            return 0;
        });
        this.receivedMessages = Flux.concat(new Publisher[]{serviceBusReceiveLink.receive().publishOn(scheduler).doOnSubscribe(subscription -> {
            LOGGER.verbose("Adding prefetch to receive link.");
            if (i > 0) {
                serviceBusReceiveLink.addCredits(i).subscribe();
            }
        }).doOnRequest(j -> {
            if (i == 0) {
                serviceBusReceiveLink.addCredits((int) j).subscribe();
            } else {
                serviceBusReceiveLink.addCredits(Math.max(0, i - serviceBusReceiveLink.getCredits())).subscribe();
            }
        }).limitRate(1).takeUntilOther(this.cancelReceiveProcessor).map(message -> {
            ServiceBusReceivedMessage serviceBusReceivedMessage = (ServiceBusReceivedMessage) messageSerializer.deserialize(message, ServiceBusReceivedMessage.class);
            if (CoreUtils.isNullOrEmpty(serviceBusReceivedMessage.getLockToken()) || serviceBusReceivedMessage.getLockedUntil() == null) {
                LOGGER.atInfo().addKeyValue(ServiceBusConstants.SESSION_ID_KEY, serviceBusReceivedMessage.getSessionId()).addKeyValue(ServiceBusConstants.MESSAGE_ID_LOGGING_KEY, serviceBusReceivedMessage.getMessageId()).log("There is no lock token.");
            } else {
                this.lockContainer.addOrUpdate(serviceBusReceivedMessage.getLockToken(), serviceBusReceivedMessage.getLockedUntil(), serviceBusReceivedMessage.getLockedUntil());
            }
            return new ServiceBusMessageContext(serviceBusReceivedMessage);
        }).onErrorResume(th -> {
            LOGGER.atWarning().addKeyValue(ServiceBusConstants.SESSION_ID_KEY, this.sessionId).log("Error occurred. Ending session.", new Object[]{th});
            return Mono.just(new ServiceBusMessageContext(getSessionId(), th));
        }).doOnNext(serviceBusMessageContext -> {
            if (serviceBusMessageContext.hasError()) {
                return;
            }
            ServiceBusReceivedMessage message2 = serviceBusMessageContext.getMessage();
            String lockToken = !CoreUtils.isNullOrEmpty(message2.getLockToken()) ? message2.getLockToken() : "";
            LOGGER.atVerbose().addKeyValue(ServiceBusConstants.SESSION_ID_KEY, serviceBusMessageContext.getSessionId()).addKeyValue(ServiceBusConstants.MESSAGE_ID_LOGGING_KEY, message2.getMessageId()).log("Received message.");
            this.messageReceivedSink.next(lockToken);
        }), this.cancelReceiveProcessor});
        this.subscriptions = Disposables.composite();
        if (z) {
            this.subscriptions.add(Flux.switchOnNext(this.messageReceivedEmitter.map(str -> {
                return Mono.delay(this.retryOptions.getTryTimeout());
            })).subscribe(l -> {
                LOGGER.atInfo().addKeyValue("entityPath", serviceBusReceiveLink.getEntityPath()).addKeyValue(ServiceBusConstants.SESSION_ID_KEY, this.sessionId.get()).addKeyValue("timeout", amqpRetryOptions.getTryTimeout()).log("Did not a receive message within timeout.");
                this.cancelReceiveProcessor.onComplete();
            }));
        }
        this.subscriptions.add(serviceBusReceiveLink.getSessionId().subscribe(str2 -> {
            if (this.sessionId.compareAndSet(null, str2)) {
                return;
            }
            LOGGER.atWarning().addKeyValue("existingSessionId", this.sessionId.get()).addKeyValue("returnedSessionId", str2).log("Another method set sessionId.");
        }));
        this.subscriptions.add(serviceBusReceiveLink.getSessionLockedUntil().subscribe(offsetDateTime -> {
            if (this.sessionLockedUntil.compareAndSet(null, offsetDateTime)) {
                this.renewalOperation.compareAndSet(null, new LockRenewalOperation(this.sessionId.get(), duration, true, function, offsetDateTime));
            } else {
                LOGGER.info("SessionLockedUntil was already set: {}", new Object[]{this.sessionLockedUntil});
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean containsLockToken(String str) {
        if (str == null) {
            throw LOGGER.logExceptionAsError(new NullPointerException("'lockToken' cannot be null."));
        }
        if (str.isEmpty()) {
            throw LOGGER.logExceptionAsError(new IllegalArgumentException("'lockToken' cannot be an empty string."));
        }
        return this.lockContainer.containsUnexpired(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getLinkName() {
        return this.receiveLink.getLinkName();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getSessionId() {
        return this.sessionId.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ServiceBusMessageContext> receive() {
        return this.receivedMessages;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setSessionLockedUntil(OffsetDateTime offsetDateTime) {
        this.sessionLockedUntil.set(offsetDateTime);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> updateDisposition(String str, DeliveryState deliveryState) {
        return this.receiveLink.updateDisposition(str, deliveryState).doFinally(signalType -> {
            this.lockContainer.remove(str);
        });
    }

    public Mono<Void> closeAsync() {
        if (this.isDisposed.getAndSet(true)) {
            return this.receiveLink.closeAsync();
        }
        LockRenewalOperation andSet = this.renewalOperation.getAndSet(null);
        if (andSet != null) {
            andSet.close();
        }
        return this.receiveLink.closeAsync().doFinally(signalType -> {
            this.subscriptions.dispose();
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        closeAsync().block(this.retryOptions.getTryTimeout());
    }
}
