package org.apache.camel.component.pulsar;

import org.apache.camel.Exchange;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;

/* loaded from: input_file:org/apache/camel/component/pulsar/PulsarMessageListener.class */
public class PulsarMessageListener implements MessageListener<byte[]> {
    private final PulsarEndpoint endpoint;
    private final PulsarConsumer pulsarConsumer;

    public PulsarMessageListener(PulsarEndpoint pulsarEndpoint, PulsarConsumer pulsarConsumer) {
        this.endpoint = pulsarEndpoint;
        this.pulsarConsumer = pulsarConsumer;
    }

    public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
        Exchange updateExchange = PulsarMessageUtils.updateExchange(message, this.pulsarConsumer.createExchange(false));
        if (this.endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
            updateExchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, this.endpoint.m1getComponent().getPulsarMessageReceiptFactory().newInstance(updateExchange, message, consumer));
        }
        processAsync(updateExchange, consumer, message);
    }

    private void processAsync(Exchange exchange, Consumer<byte[]> consumer, Message<byte[]> message) {
        this.pulsarConsumer.getAsyncProcessor().process(exchange, z -> {
            try {
                if (exchange.getException() != null) {
                    this.pulsarConsumer.getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
                } else {
                    try {
                        acknowledge(consumer, message);
                    } catch (Exception e) {
                        this.pulsarConsumer.getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
                    }
                }
            } finally {
                this.pulsarConsumer.releaseExchange(exchange, false);
            }
        });
    }

    private void acknowledge(Consumer<byte[]> consumer, Message<byte[]> message) throws PulsarClientException {
        if (this.endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
            return;
        }
        consumer.acknowledge(message.getMessageId());
    }
}
