package org.apache.kafka.clients.producer.internals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/Sender.class */
public class Sender implements Runnable {
    private final Logger log;
    private final KafkaClient client;
    private final RecordAccumulator accumulator;
    private final ProducerMetadata metadata;
    private final boolean guaranteeMessageOrder;
    private final int maxRequestSize;
    private final short acks;
    private final int retries;
    private final Time time;
    private volatile boolean forceClose;
    private final SenderMetrics sensors;
    private final int requestTimeoutMs;
    private final long retryBackoffMs;
    private final ApiVersions apiVersions;
    private final TransactionManager transactionManager;
    private volatile boolean running = true;
    private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/Sender$SenderMetrics.class */
    public static class SenderMetrics {
        public final Sensor retrySensor;
        public final Sensor errorSensor;
        public final Sensor queueTimeSensor;
        public final Sensor requestTimeSensor;
        public final Sensor recordsPerRequestSensor;
        public final Sensor batchSizeSensor;
        public final Sensor compressionRateSensor;
        public final Sensor maxRecordSizeSensor;
        public final Sensor batchSplitSensor;
        private final SenderMetricsRegistry metrics;
        private final Time time;

        public SenderMetrics(SenderMetricsRegistry senderMetricsRegistry, Metadata metadata, KafkaClient kafkaClient, Time time) {
            this.metrics = senderMetricsRegistry;
            this.time = time;
            this.batchSizeSensor = senderMetricsRegistry.sensor("batch-size");
            this.batchSizeSensor.add(senderMetricsRegistry.batchSizeAvg, new Avg());
            this.batchSizeSensor.add(senderMetricsRegistry.batchSizeMax, new Max());
            this.compressionRateSensor = senderMetricsRegistry.sensor("compression-rate");
            this.compressionRateSensor.add(senderMetricsRegistry.compressionRateAvg, new Avg());
            this.queueTimeSensor = senderMetricsRegistry.sensor("queue-time");
            this.queueTimeSensor.add(senderMetricsRegistry.recordQueueTimeAvg, new Avg());
            this.queueTimeSensor.add(senderMetricsRegistry.recordQueueTimeMax, new Max());
            this.requestTimeSensor = senderMetricsRegistry.sensor("request-time");
            this.requestTimeSensor.add(senderMetricsRegistry.requestLatencyAvg, new Avg());
            this.requestTimeSensor.add(senderMetricsRegistry.requestLatencyMax, new Max());
            this.recordsPerRequestSensor = senderMetricsRegistry.sensor("records-per-request");
            this.recordsPerRequestSensor.add(new Meter(senderMetricsRegistry.recordSendRate, senderMetricsRegistry.recordSendTotal));
            this.recordsPerRequestSensor.add(senderMetricsRegistry.recordsPerRequestAvg, new Avg());
            this.retrySensor = senderMetricsRegistry.sensor("record-retries");
            this.retrySensor.add(new Meter(senderMetricsRegistry.recordRetryRate, senderMetricsRegistry.recordRetryTotal));
            this.errorSensor = senderMetricsRegistry.sensor("errors");
            this.errorSensor.add(new Meter(senderMetricsRegistry.recordErrorRate, senderMetricsRegistry.recordErrorTotal));
            this.maxRecordSizeSensor = senderMetricsRegistry.sensor("record-size");
            this.maxRecordSizeSensor.add(senderMetricsRegistry.recordSizeMax, new Max());
            this.maxRecordSizeSensor.add(senderMetricsRegistry.recordSizeAvg, new Avg());
            this.metrics.addMetric(senderMetricsRegistry.requestsInFlight, (metricConfig, j) -> {
                return kafkaClient.inFlightRequestCount();
            });
            this.metrics.addMetric(senderMetricsRegistry.metadataAge, (metricConfig2, j2) -> {
                return (j2 - metadata.lastSuccessfulUpdate()) / 1000.0d;
            });
            this.batchSplitSensor = senderMetricsRegistry.sensor("batch-split-rate");
            this.batchSplitSensor.add(new Meter(senderMetricsRegistry.batchSplitRate, senderMetricsRegistry.batchSplitTotal));
        }

        private void maybeRegisterTopicMetrics(String str) {
            String str2 = "topic." + str + ".records-per-batch";
            if (this.metrics.getSensor(str2) == null) {
                Map<String, String> singletonMap = Collections.singletonMap(ConsumerProtocol.TOPIC_KEY_NAME, str);
                this.metrics.sensor(str2).add(new Meter(this.metrics.topicRecordSendRate(singletonMap), this.metrics.topicRecordSendTotal(singletonMap)));
                this.metrics.sensor("topic." + str + ".bytes").add(new Meter(this.metrics.topicByteRate(singletonMap), this.metrics.topicByteTotal(singletonMap)));
                this.metrics.sensor("topic." + str + ".compression-rate").add(this.metrics.topicCompressionRate(singletonMap), new Avg());
                this.metrics.sensor("topic." + str + ".record-retries").add(new Meter(this.metrics.topicRecordRetryRate(singletonMap), this.metrics.topicRecordRetryTotal(singletonMap)));
                this.metrics.sensor("topic." + str + ".record-errors").add(new Meter(this.metrics.topicRecordErrorRate(singletonMap), this.metrics.topicRecordErrorTotal(singletonMap)));
            }
        }

        public void updateProduceRequestMetrics(Map<Integer, List<ProducerBatch>> map) {
            long milliseconds = this.time.milliseconds();
            Iterator<List<ProducerBatch>> it = map.values().iterator();
            while (it.hasNext()) {
                int i = 0;
                for (ProducerBatch producerBatch : it.next()) {
                    String str = producerBatch.topicPartition.topic();
                    maybeRegisterTopicMetrics(str);
                    ((Sensor) Objects.requireNonNull(this.metrics.getSensor("topic." + str + ".records-per-batch"))).record(producerBatch.recordCount);
                    ((Sensor) Objects.requireNonNull(this.metrics.getSensor("topic." + str + ".bytes"))).record(producerBatch.estimatedSizeInBytes());
                    ((Sensor) Objects.requireNonNull(this.metrics.getSensor("topic." + str + ".compression-rate"))).record(producerBatch.compressionRatio());
                    this.batchSizeSensor.record(producerBatch.estimatedSizeInBytes(), milliseconds);
                    this.queueTimeSensor.record(producerBatch.queueTimeMs(), milliseconds);
                    this.compressionRateSensor.record(producerBatch.compressionRatio());
                    this.maxRecordSizeSensor.record(producerBatch.maxRecordSize, milliseconds);
                    i += producerBatch.recordCount;
                }
                this.recordsPerRequestSensor.record(i, milliseconds);
            }
        }

        public void recordRetries(String str, int i) {
            long milliseconds = this.time.milliseconds();
            this.retrySensor.record(i, milliseconds);
            Sensor sensor = this.metrics.getSensor("topic." + str + ".record-retries");
            if (sensor != null) {
                sensor.record(i, milliseconds);
            }
        }

        public void recordErrors(String str, int i) {
            long milliseconds = this.time.milliseconds();
            this.errorSensor.record(i, milliseconds);
            Sensor sensor = this.metrics.getSensor("topic." + str + ".record-errors");
            if (sensor != null) {
                sensor.record(i, milliseconds);
            }
        }

        public void recordLatency(String str, long j) {
            long milliseconds = this.time.milliseconds();
            this.requestTimeSensor.record(j, milliseconds);
            if (str.isEmpty()) {
                return;
            }
            Sensor sensor = this.metrics.getSensor("node-" + str + ".latency");
            if (sensor != null) {
                sensor.record(j, milliseconds);
            }
        }

        void recordBatchSplit() {
            this.batchSplitSensor.record();
        }
    }

    public Sender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata producerMetadata, RecordAccumulator recordAccumulator, boolean z, int i, short s, int i2, SenderMetricsRegistry senderMetricsRegistry, Time time, int i3, long j, TransactionManager transactionManager, ApiVersions apiVersions) {
        this.log = logContext.logger(Sender.class);
        this.client = kafkaClient;
        this.accumulator = recordAccumulator;
        this.metadata = producerMetadata;
        this.guaranteeMessageOrder = z;
        this.maxRequestSize = i;
        this.acks = s;
        this.retries = i2;
        this.time = time;
        this.sensors = new SenderMetrics(senderMetricsRegistry, producerMetadata, kafkaClient, time);
        this.requestTimeoutMs = i3;
        this.retryBackoffMs = j;
        this.apiVersions = apiVersions;
        this.transactionManager = transactionManager;
    }

    public List<ProducerBatch> inFlightBatches(TopicPartition topicPartition) {
        return this.inFlightBatches.containsKey(topicPartition) ? this.inFlightBatches.get(topicPartition) : new ArrayList();
    }

    private void maybeRemoveFromInflightBatches(ProducerBatch producerBatch) {
        List<ProducerBatch> list = this.inFlightBatches.get(producerBatch.topicPartition);
        if (list != null) {
            list.remove(producerBatch);
            if (list.isEmpty()) {
                this.inFlightBatches.remove(producerBatch.topicPartition);
            }
        }
    }

    private void maybeRemoveAndDeallocateBatch(ProducerBatch producerBatch) {
        maybeRemoveFromInflightBatches(producerBatch);
        this.accumulator.deallocate(producerBatch);
    }

    private List<ProducerBatch> getExpiredInflightBatches(long j) {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> it = this.inFlightBatches.entrySet().iterator();
        while (it.hasNext()) {
            List<ProducerBatch> value = it.next().getValue();
            if (value != null) {
                Iterator<ProducerBatch> it2 = value.iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    ProducerBatch next = it2.next();
                    if (!next.hasReachedDeliveryTimeout(this.accumulator.getDeliveryTimeoutMs(), j)) {
                        this.accumulator.maybeUpdateNextBatchExpiryTime(next);
                        break;
                    }
                    it2.remove();
                    if (next.isDone()) {
                        throw new IllegalStateException(next.topicPartition + " batch created at " + next.createdMs + " gets unexpected final state " + next.finalState());
                    }
                    arrayList.add(next);
                }
                if (value.isEmpty()) {
                    it.remove();
                }
            }
        }
        return arrayList;
    }

    private void addToInflightBatches(List<ProducerBatch> list) {
        for (ProducerBatch producerBatch : list) {
            List<ProducerBatch> list2 = this.inFlightBatches.get(producerBatch.topicPartition);
            if (list2 == null) {
                list2 = new ArrayList();
                this.inFlightBatches.put(producerBatch.topicPartition, list2);
            }
            list2.add(producerBatch);
        }
    }

    public void addToInflightBatches(Map<Integer, List<ProducerBatch>> map) {
        Iterator<List<ProducerBatch>> it = map.values().iterator();
        while (it.hasNext()) {
            addToInflightBatches(it.next());
        }
    }

    private boolean hasPendingTransactionalRequests() {
        return this.transactionManager != null && this.transactionManager.hasPendingRequests() && this.transactionManager.hasOngoingTransaction();
    }

    @Override // java.lang.Runnable
    public void run() {
        this.log.debug("Starting Kafka producer I/O thread.");
        while (this.running) {
            try {
                runOnce();
            } catch (Exception e) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        this.log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
        while (!this.forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0 || hasPendingTransactionalRequests())) {
            try {
                runOnce();
            } catch (Exception e2) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", e2);
            }
        }
        while (!this.forceClose && this.transactionManager != null && this.transactionManager.hasOngoingTransaction()) {
            if (!this.transactionManager.isCompleting()) {
                this.log.info("Aborting incomplete transaction due to shutdown");
                this.transactionManager.beginAbort();
            }
            try {
                runOnce();
            } catch (Exception e3) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", e3);
            }
        }
        if (this.forceClose) {
            if (this.transactionManager != null) {
                this.log.debug("Aborting incomplete transactional requests due to forced shutdown");
                this.transactionManager.close();
            }
            this.log.debug("Aborting incomplete batches due to forced shutdown");
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e4) {
            this.log.error("Failed to close network client", e4);
        }
        this.log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

    void runOnce() {
        if (this.transactionManager != null) {
            try {
                this.transactionManager.maybeResolveSequences();
                if (this.transactionManager.hasFatalError()) {
                    RuntimeException lastError = this.transactionManager.lastError();
                    if (lastError != null) {
                        maybeAbortBatches(lastError);
                    }
                    this.client.poll(this.retryBackoffMs, this.time.milliseconds());
                    return;
                }
                this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
                if (maybeSendAndPollTransactionalRequest()) {
                    return;
                }
            } catch (AuthenticationException e) {
                this.log.trace("Authentication exception while processing transactional request", e);
                this.transactionManager.authenticationFailed(e);
            }
        }
        long milliseconds = this.time.milliseconds();
        this.client.poll(sendProducerData(milliseconds), milliseconds);
    }

    private long sendProducerData(long j) {
        Cluster fetch = this.metadata.fetch();
        RecordAccumulator.ReadyCheckResult ready = this.accumulator.ready(fetch, j);
        if (!ready.unknownLeaderTopics.isEmpty()) {
            Iterator<String> it = ready.unknownLeaderTopics.iterator();
            while (it.hasNext()) {
                this.metadata.add(it.next(), j);
            }
            this.log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", ready.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }
        Iterator<Node> it2 = ready.readyNodes.iterator();
        long j2 = Long.MAX_VALUE;
        while (it2.hasNext()) {
            Node next = it2.next();
            if (!this.client.ready(next, j)) {
                it2.remove();
                j2 = Math.min(j2, this.client.pollDelayMs(next, j));
            }
        }
        Map<Integer, List<ProducerBatch>> drain = this.accumulator.drain(fetch, ready.readyNodes, this.maxRequestSize, j);
        addToInflightBatches(drain);
        if (this.guaranteeMessageOrder) {
            Iterator<List<ProducerBatch>> it3 = drain.values().iterator();
            while (it3.hasNext()) {
                Iterator<ProducerBatch> it4 = it3.next().iterator();
                while (it4.hasNext()) {
                    this.accumulator.mutePartition(it4.next().topicPartition);
                }
            }
        }
        this.accumulator.resetNextBatchExpiryTime();
        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(j);
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(j);
        expiredBatches.addAll(expiredInflightBatches);
        if (!expiredBatches.isEmpty()) {
            this.log.trace("Expired {} batches in accumulator", Integer.valueOf(expiredBatches.size()));
        }
        for (ProducerBatch producerBatch : expiredBatches) {
            failBatch(producerBatch, -1L, -1L, new TimeoutException("Expiring " + producerBatch.recordCount + " record(s) for " + producerBatch.topicPartition + ":" + (j - producerBatch.createdMs) + " ms has passed since batch creation"), false);
            if (this.transactionManager != null && producerBatch.inRetry()) {
                this.transactionManager.markSequenceUnresolved(producerBatch);
            }
        }
        this.sensors.updateProduceRequestMetrics(drain);
        long max = Math.max(Math.min(Math.min(ready.nextReadyCheckDelayMs, j2), this.accumulator.nextExpiryTimeMs().longValue() - j), 0L);
        if (!ready.readyNodes.isEmpty()) {
            this.log.trace("Nodes with data ready to send: {}", ready.readyNodes);
            max = 0;
        }
        sendProduceRequests(drain, j);
        return max;
    }

    private boolean maybeSendAndPollTransactionalRequest() {
        if (this.transactionManager.hasInFlightRequest()) {
            this.client.poll(this.retryBackoffMs, this.time.milliseconds());
            return true;
        }
        if ((this.transactionManager.hasAbortableError() || this.transactionManager.isAborting()) && this.accumulator.hasIncomplete()) {
            RuntimeException lastError = this.transactionManager.lastError();
            if (lastError == null) {
                lastError = new KafkaException("Failing batch since transaction was aborted");
            }
            this.accumulator.abortUndrainedBatches(lastError);
        }
        if (this.transactionManager.isCompleting() && !this.accumulator.flushInProgress()) {
            this.accumulator.beginFlush();
        }
        TransactionManager.TxnRequestHandler nextRequest = this.transactionManager.nextRequest(this.accumulator.hasIncomplete());
        if (nextRequest == null) {
            return false;
        }
        AbstractRequest.Builder<?> requestBuilder = nextRequest.requestBuilder();
        try {
            Node awaitNodeReady = awaitNodeReady(nextRequest.coordinatorType());
            if (awaitNodeReady == null) {
                maybeFindCoordinatorAndRetry(nextRequest);
                return true;
            }
            if (nextRequest.isRetry()) {
                this.time.sleep(nextRequest.retryBackoffMs());
            }
            long milliseconds = this.time.milliseconds();
            ClientRequest newClientRequest = this.client.newClientRequest(awaitNodeReady.idString(), requestBuilder, milliseconds, true, this.requestTimeoutMs, nextRequest);
            this.log.debug("Sending transactional request {} to node {} with correlation ID {}", new Object[]{requestBuilder, awaitNodeReady, Integer.valueOf(newClientRequest.correlationId())});
            this.client.send(newClientRequest, milliseconds);
            this.transactionManager.setInFlightCorrelationId(newClientRequest.correlationId());
            this.client.poll(this.retryBackoffMs, this.time.milliseconds());
            return true;
        } catch (IOException e) {
            this.log.debug("Disconnect from {} while trying to send request {}. Going to back off and retry.", new Object[]{null, requestBuilder, e});
            maybeFindCoordinatorAndRetry(nextRequest);
            return true;
        }
    }

    private void maybeFindCoordinatorAndRetry(TransactionManager.TxnRequestHandler txnRequestHandler) {
        if (txnRequestHandler.needsCoordinator()) {
            this.transactionManager.lookupCoordinator(txnRequestHandler);
        } else {
            this.time.sleep(this.retryBackoffMs);
            this.metadata.requestUpdate();
        }
        this.transactionManager.retry(txnRequestHandler);
    }

    private void maybeAbortBatches(RuntimeException runtimeException) {
        if (this.accumulator.hasIncomplete()) {
            this.log.error("Aborting producer batches due to fatal error", runtimeException);
            this.accumulator.abortBatches(runtimeException);
        }
    }

    public void initiateClose() {
        this.accumulator.close();
        this.running = false;
        wakeup();
    }

    public void forceClose() {
        this.forceClose = true;
        initiateClose();
    }

    public boolean isRunning() {
        return this.running;
    }

    private Node awaitNodeReady(FindCoordinatorRequest.CoordinatorType coordinatorType) throws IOException {
        Node coordinator = coordinatorType != null ? this.transactionManager.coordinator(coordinatorType) : this.client.leastLoadedNode(this.time.milliseconds());
        if (coordinator == null || !NetworkClientUtils.awaitReady(this.client, coordinator, this.time, this.requestTimeoutMs)) {
            return null;
        }
        if (coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION) {
            this.transactionManager.handleCoordinatorReady();
        }
        return coordinator;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleProduceResponse(ClientResponse clientResponse, Map<TopicPartition, ProducerBatch> map, long j) {
        RequestHeader requestHeader = clientResponse.requestHeader();
        long receivedTimeMs = clientResponse.receivedTimeMs();
        int correlationId = requestHeader.correlationId();
        if (clientResponse.wasDisconnected()) {
            this.log.trace("Cancelled request with header {} due to node {} being disconnected", requestHeader, clientResponse.destination());
            Iterator<ProducerBatch> it = map.values().iterator();
            while (it.hasNext()) {
                completeBatch(it.next(), new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, j, 0L);
            }
            return;
        }
        if (clientResponse.versionMismatch() != null) {
            this.log.warn("Cancelled request {} due to a version mismatch with node {}", new Object[]{clientResponse, clientResponse.destination(), clientResponse.versionMismatch()});
            Iterator<ProducerBatch> it2 = map.values().iterator();
            while (it2.hasNext()) {
                completeBatch(it2.next(), new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, j, 0L);
            }
            return;
        }
        this.log.trace("Received produce response from node {} with correlation id {}", clientResponse.destination(), Integer.valueOf(correlationId));
        if (!clientResponse.hasResponse()) {
            Iterator<ProducerBatch> it3 = map.values().iterator();
            while (it3.hasNext()) {
                completeBatch(it3.next(), new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, j, 0L);
            }
            return;
        }
        for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : ((ProduceResponse) clientResponse.responseBody()).responses().entrySet()) {
            completeBatch(map.get(entry.getKey()), entry.getValue(), correlationId, j, receivedTimeMs + r0.throttleTimeMs());
        }
        this.sensors.recordLatency(clientResponse.destination(), clientResponse.requestLatencyMs());
    }

    private void completeBatch(ProducerBatch producerBatch, ProduceResponse.PartitionResponse partitionResponse, long j, long j2, long j3) {
        Errors errors = partitionResponse.error;
        if (errors == Errors.MESSAGE_TOO_LARGE && producerBatch.recordCount > 1 && !producerBatch.isDone() && (producerBatch.magic() >= 2 || producerBatch.isCompressed())) {
            this.log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", new Object[]{Long.valueOf(j), producerBatch.topicPartition, Integer.valueOf(this.retries - producerBatch.attempts()), errors});
            if (this.transactionManager != null) {
                this.transactionManager.removeInFlightBatch(producerBatch);
            }
            this.accumulator.splitAndReenqueue(producerBatch);
            maybeRemoveAndDeallocateBatch(producerBatch);
            this.sensors.recordBatchSplit();
        } else if (errors != Errors.NONE) {
            if (canRetry(producerBatch, partitionResponse, j2)) {
                this.log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", new Object[]{Long.valueOf(j), producerBatch.topicPartition, Integer.valueOf((this.retries - producerBatch.attempts()) - 1), errors});
                reenqueueBatch(producerBatch, j2);
            } else if (errors == Errors.DUPLICATE_SEQUENCE_NUMBER) {
                completeBatch(producerBatch, partitionResponse);
            } else {
                failBatch(producerBatch, partitionResponse, errors == Errors.TOPIC_AUTHORIZATION_FAILED ? new TopicAuthorizationException((Set<String>) Collections.singleton(producerBatch.topicPartition.topic())) : errors == Errors.CLUSTER_AUTHORIZATION_FAILED ? new ClusterAuthorizationException("The producer is not authorized to do idempotent sends") : errors.exception(), producerBatch.attempts() < this.retries);
            }
            if (errors.exception() instanceof InvalidMetadataException) {
                if (errors.exception() instanceof UnknownTopicOrPartitionException) {
                    this.log.warn("Received unknown topic or partition error in produce request on partition {}. The topic-partition may not exist or the user may not have Describe access to it", producerBatch.topicPartition);
                } else {
                    this.log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going to request metadata update now", producerBatch.topicPartition, errors.exception().toString());
                }
                this.metadata.requestUpdate();
            }
        } else {
            completeBatch(producerBatch, partitionResponse);
        }
        if (this.guaranteeMessageOrder) {
            this.accumulator.unmutePartition(producerBatch.topicPartition, j3);
        }
    }

    private void reenqueueBatch(ProducerBatch producerBatch, long j) {
        this.accumulator.reenqueue(producerBatch, j);
        maybeRemoveFromInflightBatches(producerBatch);
        this.sensors.recordRetries(producerBatch.topicPartition.topic(), producerBatch.recordCount);
    }

    private void completeBatch(ProducerBatch producerBatch, ProduceResponse.PartitionResponse partitionResponse) {
        if (this.transactionManager != null) {
            this.transactionManager.handleCompletedBatch(producerBatch, partitionResponse);
        }
        if (producerBatch.done(partitionResponse.baseOffset, partitionResponse.logAppendTime, null)) {
            maybeRemoveAndDeallocateBatch(producerBatch);
        }
    }

    private void failBatch(ProducerBatch producerBatch, ProduceResponse.PartitionResponse partitionResponse, RuntimeException runtimeException, boolean z) {
        failBatch(producerBatch, partitionResponse.baseOffset, partitionResponse.logAppendTime, runtimeException, z);
    }

    private void failBatch(ProducerBatch producerBatch, long j, long j2, RuntimeException runtimeException, boolean z) {
        if (this.transactionManager != null) {
            this.transactionManager.handleFailedBatch(producerBatch, runtimeException, z);
        }
        this.sensors.recordErrors(producerBatch.topicPartition.topic(), producerBatch.recordCount);
        if (producerBatch.done(j, j2, runtimeException)) {
            maybeRemoveAndDeallocateBatch(producerBatch);
        }
    }

    private boolean canRetry(ProducerBatch producerBatch, ProduceResponse.PartitionResponse partitionResponse, long j) {
        return !producerBatch.hasReachedDeliveryTimeout(this.accumulator.getDeliveryTimeoutMs(), j) && producerBatch.attempts() < this.retries && !producerBatch.isDone() && (this.transactionManager != null ? this.transactionManager.canRetry(partitionResponse, producerBatch) : (partitionResponse.error.exception() instanceof RetriableException));
    }

    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> map, long j) {
        for (Map.Entry<Integer, List<ProducerBatch>> entry : map.entrySet()) {
            sendProduceRequest(j, entry.getKey().intValue(), this.acks, this.requestTimeoutMs, entry.getValue());
        }
    }

    private void sendProduceRequest(long j, int i, short s, int i2, List<ProducerBatch> list) {
        if (list.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap(list.size());
        final HashMap hashMap2 = new HashMap(list.size());
        byte maxUsableProduceMagic = this.apiVersions.maxUsableProduceMagic();
        for (ProducerBatch producerBatch : list) {
            if (producerBatch.magic() < maxUsableProduceMagic) {
                maxUsableProduceMagic = producerBatch.magic();
            }
        }
        for (ProducerBatch producerBatch2 : list) {
            TopicPartition topicPartition = producerBatch2.topicPartition;
            MemoryRecords records = producerBatch2.records();
            if (!records.hasMatchingMagic(maxUsableProduceMagic)) {
                records = producerBatch2.records().downConvert(maxUsableProduceMagic, 0L, this.time).records();
            }
            hashMap.put(topicPartition, records);
            hashMap2.put(topicPartition, producerBatch2);
        }
        String str = null;
        if (this.transactionManager != null && this.transactionManager.isTransactional()) {
            str = this.transactionManager.transactionalId();
        }
        ProduceRequest.Builder forMagic = ProduceRequest.Builder.forMagic(maxUsableProduceMagic, s, i2, hashMap, str);
        RequestCompletionHandler requestCompletionHandler = new RequestCompletionHandler() { // from class: org.apache.kafka.clients.producer.internals.Sender.1
            @Override // org.apache.kafka.clients.RequestCompletionHandler
            public void onComplete(ClientResponse clientResponse) {
                Sender.this.handleProduceResponse(clientResponse, hashMap2, Sender.this.time.milliseconds());
            }
        };
        String num = Integer.toString(i);
        this.client.send(this.client.newClientRequest(num, forMagic, j, s != 0, this.requestTimeoutMs, requestCompletionHandler), j);
        this.log.trace("Sent produce request to {}: {}", num, forMagic);
    }

    public void wakeup() {
        this.client.wakeup();
    }

    public static Sensor throttleTimeSensor(SenderMetricsRegistry senderMetricsRegistry) {
        Sensor sensor = senderMetricsRegistry.sensor("produce-throttle-time");
        sensor.add(senderMetricsRegistry.produceThrottleTimeAvg, new Avg());
        sensor.add(senderMetricsRegistry.produceThrottleTimeMax, new Max());
        return sensor;
    }
}
