package org.apache.camel.component.aws2.sqs;

import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Processor;
import org.apache.camel.clock.Clock;
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.ScheduledPollConsumerScheduler;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.apache.commons.io.function.IOConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageNotInflightException;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.ReceiptHandleIsInvalidException;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.SqsException;

/* loaded from: input_file:org/apache/camel/component/aws2/sqs/Sqs2Consumer.class */
public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(Sqs2Consumer.class);
    private TimeoutExtender timeoutExtender;
    private ScheduledFuture<?> scheduledFuture;
    private ScheduledExecutorService scheduledExecutor;
    private PollingTask pollingTask;
    private final String sqsConsumerToString;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/aws2/sqs/Sqs2Consumer$PollingContext.class */
    public static final class PollingContext extends Record {
        private final AtomicReference<UUID> missingQueueHandlerRequestId;
        private final Queue<Exception> errors;

        private PollingContext() {
            this(new AtomicReference(), new ConcurrentLinkedQueue());
        }

        PollingContext(AtomicReference<UUID> atomicReference, Queue<Exception> queue) {
            Objects.requireNonNull(atomicReference);
            Objects.requireNonNull(queue);
            this.missingQueueHandlerRequestId = atomicReference;
            this.errors = queue;
        }

        private void fireQueueMissing(UUID uuid) {
            this.missingQueueHandlerRequestId.compareAndSet(null, uuid);
        }

        private void firePollingError(Exception exc) {
            this.errors.offer(exc);
        }

        private boolean isQueueMissing() {
            return this.missingQueueHandlerRequestId.get() != null;
        }

        private boolean isMissingQueueHandledInAnotherRequest(UUID uuid) {
            UUID uuid2 = this.missingQueueHandlerRequestId.get();
            return (uuid2 == null || uuid.equals(uuid2)) ? false : true;
        }

        private boolean hasErrors() {
            return !this.errors.isEmpty();
        }

        private int errorCount() {
            return this.errors.size();
        }

        private Exception firstError() {
            return this.errors.peek();
        }

        private void rethrowIfFirstErrorIsRuntimeException() {
            Exception firstError = firstError();
            if (firstError instanceof RuntimeException) {
                throw ((RuntimeException) firstError);
            }
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, PollingContext.class), PollingContext.class, "missingQueueHandlerRequestId;errors", "FIELD:Lorg/apache/camel/component/aws2/sqs/Sqs2Consumer$PollingContext;->missingQueueHandlerRequestId:Ljava/util/concurrent/atomic/AtomicReference;", "FIELD:Lorg/apache/camel/component/aws2/sqs/Sqs2Consumer$PollingContext;->errors:Ljava/util/Queue;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, PollingContext.class), PollingContext.class, "missingQueueHandlerRequestId;errors", "FIELD:Lorg/apache/camel/component/aws2/sqs/Sqs2Consumer$PollingContext;->missingQueueHandlerRequestId:Ljava/util/concurrent/atomic/AtomicReference;", "FIELD:Lorg/apache/camel/component/aws2/sqs/Sqs2Consumer$PollingContext;->errors:Ljava/util/Queue;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, PollingContext.class, Object.class), PollingContext.class, "missingQueueHandlerRequestId;errors", "FIELD:Lorg/apache/camel/component/aws2/sqs/Sqs2Consumer$PollingContext;->missingQueueHandlerRequestId:Ljava/util/concurrent/atomic/AtomicReference;", "FIELD:Lorg/apache/camel/component/aws2/sqs/Sqs2Consumer$PollingContext;->errors:Ljava/util/Queue;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public AtomicReference<UUID> missingQueueHandlerRequestId() {
            return this.missingQueueHandlerRequestId;
        }

        public Queue<Exception> errors() {
            return this.errors;
        }
    }

    /* loaded from: input_file:org/apache/camel/component/aws2/sqs/Sqs2Consumer$PollingTask.class */
    private static class PollingTask implements Callable<List<Message>>, Closeable {
        private static final int MAX_NUMBER_OF_MESSAGES_PER_REQUEST = 10;
        private static final long RECENTLY_DELETED_QUEUE_BACKOFF_TIME_MS = 30000;
        private static final Pattern COMMA_SEPARATED_PATTERN = Pattern.compile(",");
        private final AtomicLong queueAutoCreationScheduleTime = new AtomicLong(0);
        private final Lock lock = new ReentrantLock();
        private final AtomicBoolean closed = new AtomicBoolean();
        private final Clock clock;
        private final SqsClient sqsClient;
        private final ExecutorService requestExecutor;
        private final ExecutorServiceManager executorServiceManager;
        private final IOConsumer<SqsClient> createQueueOperation;
        private final String queueName;
        private final String queueUrl;
        private final int maxMessagesPerPoll;
        private final Integer visibilityTimeout;
        private final Integer waitTimeSeconds;
        private final Collection<MessageSystemAttributeName> attributeNames;
        private final Collection<String> messageAttributeNames;
        private final int numberOfRequestsPerPoll;
        private final boolean queueAutoCreationEnabled;
        private final MessageSystemAttributeName sortAttributeName;

        private PollingTask(Sqs2Endpoint sqs2Endpoint) {
            this.clock = sqs2Endpoint.getClock();
            this.sqsClient = sqs2Endpoint.getClient();
            this.executorServiceManager = sqs2Endpoint.getCamelContext().getExecutorServiceManager();
            Objects.requireNonNull(sqs2Endpoint);
            this.createQueueOperation = sqs2Endpoint::createQueue;
            this.queueName = sqs2Endpoint.getConfiguration().getQueueName();
            this.queueUrl = sqs2Endpoint.getQueueUrl();
            this.visibilityTimeout = sqs2Endpoint.getConfiguration().getVisibilityTimeout();
            this.waitTimeSeconds = sqs2Endpoint.getConfiguration().getWaitTimeSeconds();
            this.messageAttributeNames = splitCommaSeparatedValues(sqs2Endpoint.getConfiguration().getMessageAttributeNames());
            this.sortAttributeName = getSortAttributeName(sqs2Endpoint.getConfiguration());
            this.attributeNames = getAttributeNames(sqs2Endpoint.getConfiguration(), this.sortAttributeName);
            this.queueAutoCreationEnabled = sqs2Endpoint.getConfiguration().isAutoCreateQueue();
            this.maxMessagesPerPoll = Math.max(1, sqs2Endpoint.getMaxMessagesPerPoll());
            this.numberOfRequestsPerPoll = computeNumberOfRequestPerPoll(this.maxMessagesPerPoll);
            this.requestExecutor = this.executorServiceManager.newFixedThreadPool(this, "%s[%s]".formatted(getClass().getSimpleName(), this.queueName), Math.min(this.numberOfRequestsPerPoll, Math.max(1, sqs2Endpoint.getConfiguration().getConcurrentRequestLimit())));
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.closed.set(true);
            this.executorServiceManager.shutdownNow(this.requestExecutor);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public List<Message> call() throws IOException {
            if (isClosed() || processScheduledQueueAutoCreation()) {
                return Collections.emptyList();
            }
            PollingContext pollingContext = new PollingContext();
            List<Message> poll = poll(pollingContext);
            if (pollingContext.errorCount() != this.numberOfRequestsPerPoll) {
                return poll;
            }
            if (pollingContext.errorCount() != 1) {
                throw new IOException("Error while polling - all %s requests resulted in an error, please check the logs for more details".formatted(Integer.valueOf(this.numberOfRequestsPerPoll)));
            }
            pollingContext.rethrowIfFirstErrorIsRuntimeException();
            throw new IOException("Error while polling", pollingContext.firstError());
        }

        private List<Message> poll(PollingContext pollingContext) throws IOException {
            if (this.numberOfRequestsPerPoll == 1) {
                return poll(this.maxMessagesPerPoll, pollingContext);
            }
            try {
                CompletableFuture completedFuture = CompletableFuture.completedFuture(Collections.emptyList());
                for (int i = this.maxMessagesPerPoll; i > 0; i -= 10) {
                    int min = Math.min(i, 10);
                    completedFuture = mergeResults(completedFuture, CompletableFuture.supplyAsync(() -> {
                        return poll(min, pollingContext);
                    }, this.requestExecutor));
                }
                return (List) completedFuture.thenApply(this::sortIfNeeded).get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                Sqs2Consumer.LOG.debug("Polling interrupted", e);
                return Collections.emptyList();
            } catch (ExecutionException e2) {
                throw new IOException("Error while polling", e2.getCause());
            }
        }

        private List<Message> poll(int i, PollingContext pollingContext) {
            if (pollingContext.isQueueMissing()) {
                return Collections.emptyList();
            }
            try {
                return this.sqsClient.receiveMessage(createReceiveRequest(i)).messages();
            } catch (QueueDoesNotExistException e) {
                return handleMissingQueueError(pollingContext, e);
            } catch (Exception e2) {
                Sqs2Consumer.LOG.error("Error while polling", e2);
                pollingContext.firePollingError(e2);
                return Collections.emptyList();
            }
        }

        private List<Message> handleMissingQueueError(PollingContext pollingContext, QueueDoesNotExistException queueDoesNotExistException) {
            if (pollingContext.isQueueMissing()) {
                return Collections.emptyList();
            }
            UUID randomUUID = UUID.randomUUID();
            pollingContext.fireQueueMissing(randomUUID);
            if (this.queueAutoCreationEnabled) {
                createQueue(randomUUID, pollingContext);
                return Collections.emptyList();
            }
            Sqs2Consumer.LOG.error("Error while polling {} queue does not exists", this.queueName, queueDoesNotExistException);
            pollingContext.firePollingError(queueDoesNotExistException);
            return Collections.emptyList();
        }

        private ReceiveMessageRequest createReceiveRequest(int i) {
            ReceiveMessageRequest.Builder waitTimeSeconds = ReceiveMessageRequest.builder().queueUrl(this.queueUrl).maxNumberOfMessages(Integer.valueOf(i)).visibilityTimeout(this.visibilityTimeout).waitTimeSeconds(this.waitTimeSeconds);
            if (!this.attributeNames.isEmpty()) {
                waitTimeSeconds.messageSystemAttributeNames(this.attributeNames);
            }
            if (!this.messageAttributeNames.isEmpty()) {
                waitTimeSeconds.messageAttributeNames(this.messageAttributeNames);
            }
            Sqs2Consumer.LOG.trace("Receiving messages with request [{}]...", waitTimeSeconds);
            return (ReceiveMessageRequest) waitTimeSeconds.mo1429build();
        }

        private void createQueue(UUID uuid, PollingContext pollingContext) {
            this.lock.lock();
            try {
                if (isClosed() || pollingContext.isMissingQueueHandledInAnotherRequest(uuid)) {
                    return;
                }
                try {
                    try {
                        this.createQueueOperation.accept(this.sqsClient);
                    } catch (QueueDeletedRecentlyException e) {
                        Sqs2Consumer.LOG.debug("Queue recently deleted, will retry after at least 30 seconds on next polling request.", e);
                        scheduleQueueAutoCreation();
                    }
                } catch (Exception e2) {
                    Sqs2Consumer.LOG.error("Error while creating queue.", e2);
                    pollingContext.firePollingError(e2);
                }
                this.lock.unlock();
            } finally {
                this.lock.unlock();
            }
        }

        private boolean processScheduledQueueAutoCreation() throws IOException {
            long j = this.queueAutoCreationScheduleTime.get();
            if (j == 0) {
                return false;
            }
            long elapsed = this.clock.elapsed();
            if (j > elapsed) {
                Sqs2Consumer.LOG.debug("{}ms remaining until queue auto-creation is triggered", Long.valueOf(j - elapsed));
                return true;
            }
            PollingContext pollingContext = new PollingContext();
            createQueue(UUID.randomUUID(), pollingContext);
            if (pollingContext.hasErrors()) {
                pollingContext.rethrowIfFirstErrorIsRuntimeException();
                throw new IOException("Error while creating %s queue".formatted(this.queueName), pollingContext.firstError());
            }
            cancelScheduledQueueAutoCreation();
            return true;
        }

        private void scheduleQueueAutoCreation() {
            this.queueAutoCreationScheduleTime.set(this.clock.elapsed() + RECENTLY_DELETED_QUEUE_BACKOFF_TIME_MS);
        }

        private void cancelScheduledQueueAutoCreation() {
            this.queueAutoCreationScheduleTime.set(0L);
        }

        private boolean isClosed() {
            return this.closed.get();
        }

        private static List<String> splitCommaSeparatedValues(String str) {
            return (str == null || str.isEmpty()) ? Collections.emptyList() : COMMA_SEPARATED_PATTERN.splitAsStream(str).map((v0) -> {
                return v0.trim();
            }).filter(str2 -> {
                return !str2.isEmpty();
            }).toList();
        }

        private static Optional<MessageSystemAttributeName> parseMessageSystemAttributeName(String str) {
            if (str == null || str.isEmpty()) {
                return Optional.empty();
            }
            MessageSystemAttributeName fromValue = MessageSystemAttributeName.fromValue(str);
            if (fromValue != MessageSystemAttributeName.UNKNOWN_TO_SDK_VERSION) {
                return Optional.of(fromValue);
            }
            Sqs2Consumer.LOG.warn("Unsupported attribute name '{}' use one of {}", str, MessageSystemAttributeName.knownValues());
            return Optional.empty();
        }

        private static MessageSystemAttributeName getSortAttributeName(Sqs2Configuration sqs2Configuration) {
            return parseMessageSystemAttributeName(sqs2Configuration.getSortAttributeName()).filter(messageSystemAttributeName -> {
                if (messageSystemAttributeName != MessageSystemAttributeName.ALL) {
                    return true;
                }
                Sqs2Consumer.LOG.warn("The {} attribute cannot be used for sorting the received messages", MessageSystemAttributeName.ALL);
                return false;
            }).orElse(null);
        }

        private static List<MessageSystemAttributeName> getAttributeNames(Sqs2Configuration sqs2Configuration, MessageSystemAttributeName messageSystemAttributeName) {
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = splitCommaSeparatedValues(sqs2Configuration.getAttributeNames()).iterator();
            while (it.hasNext()) {
                Optional<MessageSystemAttributeName> filter = parseMessageSystemAttributeName(it.next()).filter(messageSystemAttributeName2 -> {
                    return !arrayList.contains(messageSystemAttributeName2);
                });
                Objects.requireNonNull(arrayList);
                filter.ifPresent((v1) -> {
                    r1.add(v1);
                });
            }
            if (messageSystemAttributeName != null && !arrayList.contains(MessageSystemAttributeName.ALL) && !arrayList.contains(messageSystemAttributeName)) {
                arrayList.add(messageSystemAttributeName);
            }
            return Collections.unmodifiableList(arrayList);
        }

        private static int computeNumberOfRequestPerPoll(int i) {
            return (int) Math.ceil(Math.max(1, i) / 10.0d);
        }

        private static <T> CompletableFuture<List<T>> mergeResults(CompletableFuture<List<T>> completableFuture, CompletableFuture<List<T>> completableFuture2) {
            return (CompletableFuture<List<T>>) completableFuture.thenCombine((CompletionStage) completableFuture2, (list, list2) -> {
                ArrayList arrayList = new ArrayList(list);
                arrayList.addAll(list2);
                return arrayList;
            });
        }

        private List<Message> sortIfNeeded(List<Message> list) {
            if (Sqs2Consumer.LOG.isTraceEnabled()) {
                Sqs2Consumer.LOG.trace("Received {} messages in {} requests", Integer.valueOf(list.size()), Integer.valueOf(this.numberOfRequestsPerPoll));
            }
            return this.sortAttributeName != null ? list.stream().sorted(Comparator.comparing(message -> {
                return message.attributes().getOrDefault(this.sortAttributeName, "");
            })).toList() : list;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/aws2/sqs/Sqs2Consumer$TimeoutExtender.class */
    public class TimeoutExtender implements Runnable {
        private static final int MAX_REQUESTS = 10;
        private final int repeatSeconds;
        private final AtomicBoolean run = new AtomicBoolean(true);
        private final Map<String, ChangeMessageVisibilityBatchRequestEntry> entries = new ConcurrentHashMap();

        TimeoutExtender(int i) {
            this.repeatSeconds = i;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void add(Exchange exchange) {
            exchange.getExchangeExtension().addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.aws2.sqs.Sqs2Consumer.TimeoutExtender.1
                @Override // org.apache.camel.spi.Synchronization
                public void onComplete(Exchange exchange2) {
                    remove(exchange2);
                }

                @Override // org.apache.camel.spi.Synchronization
                public void onFailure(Exchange exchange2) {
                    remove(exchange2);
                }

                private void remove(Exchange exchange2) {
                    Sqs2Consumer.LOG.trace("Removing exchangeId {} from the TimeoutExtender, processing done", exchange2.getExchangeId());
                    TimeoutExtender.this.entries.remove(exchange2.getExchangeId());
                }
            });
            this.entries.put(exchange.getExchangeId(), (ChangeMessageVisibilityBatchRequestEntry) ChangeMessageVisibilityBatchRequestEntry.builder().id(exchange.getExchangeId()).visibilityTimeout(Integer.valueOf(this.repeatSeconds)).receiptHandle((String) exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class)).mo1429build());
        }

        public void cancel() {
            this.run.set(false);
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.run.get()) {
                LinkedList linkedList = new LinkedList(this.entries.values());
                while (!linkedList.isEmpty()) {
                    LinkedList linkedList2 = new LinkedList();
                    while (!linkedList.isEmpty() && linkedList2.size() < 10) {
                        linkedList2.add((ChangeMessageVisibilityBatchRequestEntry) linkedList.poll());
                    }
                    ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest = (ChangeMessageVisibilityBatchRequest) ChangeMessageVisibilityBatchRequest.builder().queueUrl(Sqs2Consumer.this.getQueueUrl()).entries(linkedList2).mo1429build();
                    try {
                        Sqs2Consumer.LOG.trace("Extending visibility window by {} seconds for request entries {}", Integer.valueOf(this.repeatSeconds), linkedList2);
                        Sqs2Consumer.this.getEndpoint().getClient().changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest);
                        Sqs2Consumer.LOG.debug("Extended visibility window for request entries {}", linkedList2);
                    } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
                    } catch (SqsException e2) {
                        if (!e2.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
                            logException(e2, linkedList2);
                        }
                    } catch (Exception e3) {
                        logException(e3, linkedList2);
                    }
                }
            }
        }

        private void logException(Exception exc, List<ChangeMessageVisibilityBatchRequestEntry> list) {
            Sqs2Consumer.LOG.warn("Extending visibility window failed for entries {}. Will not attempt to extend visibility further. This exception will be ignored.", list, exc);
        }
    }

    public Sqs2Consumer(Sqs2Endpoint sqs2Endpoint, Processor processor) {
        super(sqs2Endpoint, processor);
        this.sqsConsumerToString = "SqsConsumer[%s]".formatted(URISupport.sanitizeUri(sqs2Endpoint.getEndpointUri()));
    }

    @Override // org.apache.camel.support.ScheduledPollConsumer
    protected int poll() throws Exception {
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        List<Message> call = this.pollingTask.call();
        forceConsumerAsReady();
        return processBatch(CastUtils.cast((Queue<?>) createExchanges(call)));
    }

    protected Queue<Exchange> createExchanges(List<Message> list) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Received {} messages in this poll", Integer.valueOf(list.size()));
        }
        LinkedList linkedList = new LinkedList();
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            linkedList.add(createExchange(it.next()));
        }
        return linkedList;
    }

    @Override // org.apache.camel.BatchConsumer
    public int processBatch(Queue<Object> queue) throws Exception {
        int size = queue.size();
        int i = 0;
        while (i < size && isBatchAllowed()) {
            Exchange exchange = (Exchange) ObjectHelper.cast(Exchange.class, queue.poll());
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, Integer.valueOf(i));
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, Integer.valueOf(size));
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, Boolean.valueOf(i == size - 1));
            this.pendingExchanges = (size - i) - 1;
            if (this.timeoutExtender != null) {
                this.timeoutExtender.add(exchange);
            }
            exchange.getExchangeExtension().addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.aws2.sqs.Sqs2Consumer.1
                @Override // org.apache.camel.spi.Synchronization
                public void onComplete(Exchange exchange2) {
                    Sqs2Consumer.this.processCommit(exchange2);
                }

                @Override // org.apache.camel.spi.Synchronization
                public void onFailure(Exchange exchange2) {
                    Sqs2Consumer.this.processRollback(exchange2);
                }

                public String toString() {
                    return "SqsConsumerOnCompletion";
                }
            });
            getAsyncProcessor().process(exchange, defaultConsumerCallback(exchange, true));
            i++;
        }
        return size;
    }

    protected void processCommit(Exchange exchange) {
        try {
            if (shouldDelete(exchange)) {
                String str = (String) exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class);
                DeleteMessageRequest.Builder receiptHandle = DeleteMessageRequest.builder().queueUrl(getQueueUrl()).receiptHandle(str);
                LOG.trace("Deleting message with receipt handle {}...", str);
                getClient().deleteMessage((DeleteMessageRequest) receiptHandle.mo1429build());
                LOG.trace("Deleted message with receipt handle {}...", str);
            }
        } catch (SdkException e) {
            getExceptionHandler().handleException("Error occurred during deleting message. This exception is ignored.", exchange, e);
        }
    }

    private boolean shouldDelete(Exchange exchange) {
        return getConfiguration().isDeleteAfterRead() || (exchange.getProperty(Sqs2Constants.SQS_DELETE_FILTERED) != null && getConfiguration().isDeleteIfFiltered() && passedThroughFilter(exchange));
    }

    private static boolean passedThroughFilter(Exchange exchange) {
        return ((Boolean) exchange.getProperty(Sqs2Constants.SQS_DELETE_FILTERED, (Object) false, Boolean.class)).booleanValue();
    }

    protected void processRollback(Exchange exchange) {
        Exception exception = exchange.getException();
        if (exception != null) {
            getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, exception);
        }
    }

    protected Sqs2Configuration getConfiguration() {
        return getEndpoint().getConfiguration();
    }

    protected SqsClient getClient() {
        return getEndpoint().getClient();
    }

    protected String getQueueUrl() {
        return getEndpoint().getQueueUrl();
    }

    @Override // org.apache.camel.support.DefaultConsumer, org.apache.camel.EndpointAware
    public Sqs2Endpoint getEndpoint() {
        return (Sqs2Endpoint) super.getEndpoint();
    }

    public Exchange createExchange(Message message) {
        return createExchange(getEndpoint().getExchangePattern(), message);
    }

    private Exchange createExchange(ExchangePattern exchangePattern, Message message) {
        Exchange createExchange = createExchange(true);
        createExchange.setPattern(exchangePattern);
        org.apache.camel.Message in = createExchange.getIn();
        in.setBody(message.body());
        in.setHeaders(new HashMap(message.attributesAsStrings()));
        in.setHeader(Sqs2Constants.MESSAGE_ID, message.messageId());
        in.setHeader(Sqs2Constants.MD5_OF_BODY, message.md5OfBody());
        in.setHeader(Sqs2Constants.RECEIPT_HANDLE, message.receiptHandle());
        in.setHeader(Sqs2Constants.ATTRIBUTES, message.attributes());
        in.setHeader(Sqs2Constants.MESSAGE_ATTRIBUTES, message.messageAttributes());
        HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
        for (Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
            String key = entry.getKey();
            Object fromMessageAttributeValue = Sqs2MessageHelper.fromMessageAttributeValue(entry.getValue());
            if (!headerFilterStrategy.applyFilterToExternalHeaders(key, fromMessageAttributeValue, createExchange)) {
                in.setHeader(key, fromMessageAttributeValue);
            }
        }
        return createExchange;
    }

    @Override // org.apache.camel.support.DefaultConsumer
    public String toString() {
        return this.sqsConsumerToString;
    }

    @Override // org.apache.camel.support.ScheduledPollConsumer
    protected void afterConfigureScheduler(ScheduledPollConsumerScheduler scheduledPollConsumerScheduler, boolean z) {
        if (z && (scheduledPollConsumerScheduler instanceof DefaultScheduledPollConsumerScheduler)) {
            DefaultScheduledPollConsumerScheduler defaultScheduledPollConsumerScheduler = (DefaultScheduledPollConsumerScheduler) scheduledPollConsumerScheduler;
            defaultScheduledPollConsumerScheduler.setConcurrentConsumers(getConfiguration().getConcurrentConsumers());
            defaultScheduledPollConsumerScheduler.setPoolSize(Math.max(defaultScheduledPollConsumerScheduler.getPoolSize(), getConfiguration().getConcurrentConsumers()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ScheduledPollConsumer, org.apache.camel.support.DefaultConsumer, org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
        this.pollingTask = new PollingTask(getEndpoint());
        if (getConfiguration().isExtendMessageVisibility() && this.scheduledExecutor == null) {
            ThreadPoolProfile threadPoolProfile = new ThreadPoolProfile("SqsTimeoutExtender");
            threadPoolProfile.setPoolSize(1);
            threadPoolProfile.setAllowCoreThreadTimeOut(false);
            threadPoolProfile.setMaxQueueSize(-1);
            this.scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, "SqsTimeoutExtender", threadPoolProfile);
            Integer visibilityTimeout = getConfiguration().getVisibilityTimeout();
            if (visibilityTimeout != null && visibilityTimeout.intValue() > 0) {
                int intValue = visibilityTimeout.intValue();
                int doubleValue = (int) (visibilityTimeout.doubleValue() * 1.5d);
                this.timeoutExtender = new TimeoutExtender(doubleValue);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds)", new Object[]{Integer.valueOf(intValue), Integer.valueOf(intValue), Integer.valueOf(doubleValue)});
                }
                this.scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, intValue, intValue, TimeUnit.SECONDS);
            }
        }
        super.doStart();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ScheduledPollConsumer, org.apache.camel.support.DefaultConsumer, org.apache.camel.support.service.BaseService
    public void doShutdown() throws Exception {
        if (this.timeoutExtender != null) {
            this.timeoutExtender.cancel();
            this.timeoutExtender = null;
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
            this.scheduledFuture = null;
        }
        if (this.scheduledExecutor != null) {
            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.scheduledExecutor);
            this.scheduledExecutor = null;
        }
        if (this.pollingTask != null) {
            this.pollingTask.close();
            this.pollingTask = null;
        }
        super.doShutdown();
    }
}
