package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/Sender.class */
public class Sender implements Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Sender.class);
    private final KafkaClient client;
    private final RecordAccumulator accumulator;
    private final Metadata metadata;
    private final int maxRequestSize;
    private final short acks;
    private final int requestTimeout;
    private final int retries;
    private final Time time;
    private volatile boolean running = true;
    private final SenderMetrics sensors;
    private String clientId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/Sender$SenderMetrics.class */
    public class SenderMetrics {
        private final Metrics metrics;
        public final Sensor retrySensor;
        public final Sensor errorSensor;
        public final Sensor queueTimeSensor;
        public final Sensor requestTimeSensor;
        public final Sensor recordsPerRequestSensor;
        public final Sensor batchSizeSensor;
        public final Sensor compressionRateSensor;
        public final Sensor maxRecordSizeSensor;

        public SenderMetrics(Metrics metrics) {
            this.metrics = metrics;
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("client-id", Sender.this.clientId);
            this.batchSizeSensor = metrics.sensor("batch-size");
            this.batchSizeSensor.add(new MetricName("batch-size-avg", "producer-metrics", "The average number of bytes sent per partition per-request.", linkedHashMap), new Avg());
            this.batchSizeSensor.add(new MetricName("batch-size-max", "producer-metrics", "The max number of bytes sent per partition per-request.", linkedHashMap), new Max());
            this.compressionRateSensor = metrics.sensor("compression-rate");
            this.compressionRateSensor.add(new MetricName("compression-rate-avg", "producer-metrics", "The average compression rate of record batches.", linkedHashMap), new Avg());
            this.queueTimeSensor = metrics.sensor("queue-time");
            this.queueTimeSensor.add(new MetricName("record-queue-time-avg", "producer-metrics", "The average time in ms record batches spent in the record accumulator.", linkedHashMap), new Avg());
            this.queueTimeSensor.add(new MetricName("record-queue-time-max", "producer-metrics", "The maximum time in ms record batches spent in the record accumulator.", linkedHashMap), new Max());
            this.requestTimeSensor = metrics.sensor("request-time");
            this.requestTimeSensor.add(new MetricName("request-latency-avg", "producer-metrics", "The average request latency in ms", linkedHashMap), new Avg());
            this.requestTimeSensor.add(new MetricName("request-latency-max", "producer-metrics", "The maximum request latency in ms", linkedHashMap), new Max());
            this.recordsPerRequestSensor = metrics.sensor("records-per-request");
            this.recordsPerRequestSensor.add(new MetricName("record-send-rate", "producer-metrics", "The average number of records sent per second.", linkedHashMap), new Rate());
            this.recordsPerRequestSensor.add(new MetricName("records-per-request-avg", "producer-metrics", "The average number of records per request.", linkedHashMap), new Avg());
            this.retrySensor = metrics.sensor("record-retries");
            this.retrySensor.add(new MetricName("record-retry-rate", "producer-metrics", "The average per-second number of retried record sends", linkedHashMap), new Rate());
            this.errorSensor = metrics.sensor("errors");
            this.errorSensor.add(new MetricName("record-error-rate", "producer-metrics", "The average per-second number of record sends that resulted in errors", linkedHashMap), new Rate());
            this.maxRecordSizeSensor = metrics.sensor("record-size-max");
            this.maxRecordSizeSensor.add(new MetricName("record-size-max", "producer-metrics", "The maximum record size", linkedHashMap), new Max());
            this.maxRecordSizeSensor.add(new MetricName("record-size-avg", "producer-metrics", "The average record size", linkedHashMap), new Avg());
            this.metrics.addMetric(new MetricName("requests-in-flight", "producer-metrics", "The current number of in-flight requests awaiting a response.", linkedHashMap), new Measurable() { // from class: org.apache.kafka.clients.producer.internals.Sender.SenderMetrics.1
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    return Sender.this.client.inFlightRequestCount();
                }
            });
            metrics.addMetric(new MetricName("metadata-age", "producer-metrics", "The age in seconds of the current producer metadata being used.", linkedHashMap), new Measurable() { // from class: org.apache.kafka.clients.producer.internals.Sender.SenderMetrics.2
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    return (j - Sender.this.metadata.lastUpdate()) / 1000.0d;
                }
            });
        }

        public void maybeRegisterTopicMetrics(String str) {
            String str2 = "topic." + str + ".records-per-batch";
            if (this.metrics.getSensor(str2) == null) {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                linkedHashMap.put("client-id", Sender.this.clientId);
                linkedHashMap.put("topic", str);
                this.metrics.sensor(str2).add(new MetricName("record-send-rate", "producer-topic-metrics", linkedHashMap), new Rate());
                this.metrics.sensor("topic." + str + ".bytes").add(new MetricName("byte-rate", "producer-topic-metrics", linkedHashMap), new Rate());
                this.metrics.sensor("topic." + str + ".compression-rate").add(new MetricName("compression-rate", "producer-topic-metrics", linkedHashMap), new Avg());
                this.metrics.sensor("topic." + str + ".record-retries").add(new MetricName("record-retry-rate", "producer-topic-metrics", linkedHashMap), new Rate());
                this.metrics.sensor("topic." + str + ".record-errors").add(new MetricName("record-error-rate", "producer-topic-metrics", linkedHashMap), new Rate());
            }
        }

        public void updateProduceRequestMetrics(List<ClientRequest> list) {
            long milliseconds = Sender.this.time.milliseconds();
            for (int i = 0; i < list.size(); i++) {
                ClientRequest clientRequest = list.get(i);
                int i2 = 0;
                if (clientRequest.attachment() != null) {
                    for (RecordBatch recordBatch : ((Map) clientRequest.attachment()).values()) {
                        String str = recordBatch.topicPartition.topic();
                        maybeRegisterTopicMetrics(str);
                        ((Sensor) Utils.notNull(this.metrics.getSensor("topic." + str + ".records-per-batch"))).record(recordBatch.recordCount);
                        ((Sensor) Utils.notNull(this.metrics.getSensor("topic." + str + ".bytes"))).record(recordBatch.records.sizeInBytes());
                        ((Sensor) Utils.notNull(this.metrics.getSensor("topic." + str + ".compression-rate"))).record(recordBatch.records.compressionRate());
                        this.batchSizeSensor.record(recordBatch.records.sizeInBytes(), milliseconds);
                        this.queueTimeSensor.record(recordBatch.drainedMs - recordBatch.createdMs, milliseconds);
                        this.compressionRateSensor.record(recordBatch.records.compressionRate());
                        this.maxRecordSizeSensor.record(recordBatch.maxRecordSize, milliseconds);
                        i2 += recordBatch.recordCount;
                    }
                    this.recordsPerRequestSensor.record(i2, milliseconds);
                }
            }
        }

        public void recordRetries(String str, int i) {
            long milliseconds = Sender.this.time.milliseconds();
            this.retrySensor.record(i, milliseconds);
            Sensor sensor = this.metrics.getSensor("topic." + str + ".record-retries");
            if (sensor != null) {
                sensor.record(i, milliseconds);
            }
        }

        public void recordErrors(String str, int i) {
            long milliseconds = Sender.this.time.milliseconds();
            this.errorSensor.record(i, milliseconds);
            Sensor sensor = this.metrics.getSensor("topic." + str + ".record-errors");
            if (sensor != null) {
                sensor.record(i, milliseconds);
            }
        }

        public void recordLatency(int i, long j) {
            long milliseconds = Sender.this.time.milliseconds();
            this.requestTimeSensor.record(j, milliseconds);
            if (i >= 0) {
                Sensor sensor = this.metrics.getSensor("node-" + i + ".latency");
                if (sensor != null) {
                    sensor.record(j, milliseconds);
                }
            }
        }
    }

    public Sender(KafkaClient kafkaClient, Metadata metadata, RecordAccumulator recordAccumulator, int i, short s, int i2, int i3, Metrics metrics, Time time, String str) {
        this.client = kafkaClient;
        this.accumulator = recordAccumulator;
        this.metadata = metadata;
        this.maxRequestSize = i;
        this.requestTimeout = i3;
        this.acks = s;
        this.retries = i2;
        this.time = time;
        this.clientId = str;
        this.sensors = new SenderMetrics(metrics);
    }

    @Override // java.lang.Runnable
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");
        while (this.running) {
            try {
                run(this.time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", (Throwable) e);
            }
        }
        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
        while (true) {
            if (!this.accumulator.hasUnsent() && this.client.inFlightRequestCount() <= 0) {
                this.client.close();
                log.debug("Shutdown of Kafka producer I/O thread has completed.");
                return;
            } else {
                try {
                    run(this.time.milliseconds());
                } catch (Exception e2) {
                    log.error("Uncaught error in kafka producer I/O thread: ", (Throwable) e2);
                }
            }
        }
    }

    public void run(long j) {
        Cluster fetch = this.metadata.fetch();
        RecordAccumulator.ReadyCheckResult ready = this.accumulator.ready(fetch, j);
        if (ready.unknownLeadersExist) {
            this.metadata.requestUpdate();
        }
        Iterator<Node> it = ready.readyNodes.iterator();
        long j2 = Long.MAX_VALUE;
        while (it.hasNext()) {
            Node next = it.next();
            if (!this.client.ready(next, j)) {
                it.remove();
                j2 = Math.min(j2, this.client.connectionDelay(next, j));
            }
        }
        List<ClientRequest> createProduceRequests = createProduceRequests(this.accumulator.drain(fetch, ready.readyNodes, this.maxRequestSize, j), j);
        this.sensors.updateProduceRequestMetrics(createProduceRequests);
        long min = Math.min(ready.nextReadyCheckDelayMs, j2);
        if (ready.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", ready.readyNodes);
            log.trace("Created {} produce requests: {}", Integer.valueOf(createProduceRequests.size()), createProduceRequests);
            min = 0;
        }
        for (ClientResponse clientResponse : this.client.poll(createProduceRequests, min, j)) {
            if (clientResponse.wasDisconnected()) {
                handleDisconnect(clientResponse, j);
            } else {
                handleResponse(clientResponse, j);
            }
        }
    }

    public void initiateClose() {
        this.running = false;
        this.accumulator.close();
        wakeup();
    }

    private void handleDisconnect(ClientResponse clientResponse, long j) {
        log.trace("Cancelled request {} due to node {} being disconnected", clientResponse, Integer.valueOf(clientResponse.request().request().destination()));
        int correlationId = clientResponse.request().request().header().correlationId();
        Iterator it = ((Map) clientResponse.request().attachment()).values().iterator();
        while (it.hasNext()) {
            completeBatch((RecordBatch) it.next(), Errors.NETWORK_EXCEPTION, -1L, correlationId, j);
        }
    }

    private void handleResponse(ClientResponse clientResponse, long j) {
        int correlationId = clientResponse.request().request().header().correlationId();
        log.trace("Received produce response from node {} with correlation id {}", Integer.valueOf(clientResponse.request().request().destination()), Integer.valueOf(correlationId));
        Map map = (Map) clientResponse.request().attachment();
        if (!clientResponse.hasResponse()) {
            Iterator it = map.values().iterator();
            while (it.hasNext()) {
                completeBatch((RecordBatch) it.next(), Errors.NONE, -1L, correlationId, j);
            }
            return;
        }
        for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : new ProduceResponse(clientResponse.responseBody()).responses().entrySet()) {
            TopicPartition key = entry.getKey();
            ProduceResponse.PartitionResponse value = entry.getValue();
            completeBatch((RecordBatch) map.get(key), Errors.forCode(value.errorCode), value.baseOffset, correlationId, j);
        }
        this.sensors.recordLatency(clientResponse.request().request().destination(), clientResponse.requestLatencyMs());
    }

    private void completeBatch(RecordBatch recordBatch, Errors errors, long j, long j2, long j3) {
        if (errors == Errors.NONE || !canRetry(recordBatch, errors)) {
            recordBatch.done(j, errors.exception());
            this.accumulator.deallocate(recordBatch);
            if (errors != Errors.NONE) {
                this.sensors.recordErrors(recordBatch.topicPartition.topic(), recordBatch.recordCount);
            }
        } else {
            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", Long.valueOf(j2), recordBatch.topicPartition, Integer.valueOf((this.retries - recordBatch.attempts) - 1), errors);
            this.accumulator.reenqueue(recordBatch, j3);
            this.sensors.recordRetries(recordBatch.topicPartition.topic(), recordBatch.recordCount);
        }
        if (errors.exception() instanceof InvalidMetadataException) {
            this.metadata.requestUpdate();
        }
    }

    private boolean canRetry(RecordBatch recordBatch, Errors errors) {
        return recordBatch.attempts < this.retries && (errors.exception() instanceof RetriableException);
    }

    private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> map, long j) {
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<Integer, List<RecordBatch>> entry : map.entrySet()) {
            arrayList.add(produceRequest(j, entry.getKey().intValue(), this.acks, this.requestTimeout, entry.getValue()));
        }
        return arrayList;
    }

    private ClientRequest produceRequest(long j, int i, short s, int i2, List<RecordBatch> list) {
        HashMap hashMap = new HashMap(list.size());
        HashMap hashMap2 = new HashMap(list.size());
        for (RecordBatch recordBatch : list) {
            TopicPartition topicPartition = recordBatch.topicPartition;
            ByteBuffer buffer = recordBatch.records.buffer();
            buffer.flip();
            hashMap.put(topicPartition, buffer);
            hashMap2.put(topicPartition, recordBatch);
        }
        return new ClientRequest(j, s != 0, new RequestSend(i, this.client.nextRequestHeader(ApiKeys.PRODUCE), new ProduceRequest(s, i2, hashMap).toStruct()), hashMap2);
    }

    public void wakeup() {
        this.client.wakeup();
    }
}
