package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher.class */
public class Fetcher<K, V> {
    private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
    private final ConsumerNetworkClient client;
    private final Time time;
    private final int minBytes;
    private final int maxWaitMs;
    private final int fetchSize;
    private final long retryBackoffMs;
    private final boolean checkCrcs;
    private final Metadata metadata;
    private final Fetcher<K, V>.FetchManagerMetrics sensors;
    private final SubscriptionState subscriptions;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final List<PartitionRecords<K, V>> records = new LinkedList();
    private final Map<TopicPartition, Long> offsetOutOfRangePartitions = new HashMap();
    private final Set<String> unauthorizedTopics = new HashSet();
    private final Map<TopicPartition, Long> recordTooLargePartitions = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$FetchManagerMetrics.class */
    public class FetchManagerMetrics {
        public final Metrics metrics;
        public final String metricGrpName;
        public final Sensor bytesFetched;
        public final Sensor recordsFetched;
        public final Sensor fetchLatency;
        public final Sensor recordsFetchLag;
        public final Sensor fetchThrottleTimeSensor;

        public FetchManagerMetrics(Metrics metrics, String str, Map<String, String> map) {
            this.metrics = metrics;
            this.metricGrpName = str + "-fetch-manager-metrics";
            this.bytesFetched = metrics.sensor("bytes-fetched");
            this.bytesFetched.add(new MetricName("fetch-size-avg", this.metricGrpName, "The average number of bytes fetched per request", map), new Avg());
            this.bytesFetched.add(new MetricName("fetch-size-max", this.metricGrpName, "The maximum number of bytes fetched per request", map), new Max());
            this.bytesFetched.add(new MetricName("bytes-consumed-rate", this.metricGrpName, "The average number of bytes consumed per second", map), new Rate());
            this.recordsFetched = metrics.sensor("records-fetched");
            this.recordsFetched.add(new MetricName("records-per-request-avg", this.metricGrpName, "The average number of records in each request", map), new Avg());
            this.recordsFetched.add(new MetricName("records-consumed-rate", this.metricGrpName, "The average number of records consumed per second", map), new Rate());
            this.fetchLatency = metrics.sensor("fetch-latency");
            this.fetchLatency.add(new MetricName("fetch-latency-avg", this.metricGrpName, "The average time taken for a fetch request.", map), new Avg());
            this.fetchLatency.add(new MetricName("fetch-latency-max", this.metricGrpName, "The max time taken for any fetch request.", map), new Max());
            this.fetchLatency.add(new MetricName("fetch-rate", this.metricGrpName, "The number of fetch requests per second.", map), new Rate(new Count()));
            this.recordsFetchLag = metrics.sensor("records-lag");
            this.recordsFetchLag.add(new MetricName("records-lag-max", this.metricGrpName, "The maximum lag in terms of number of records for any partition in this window", map), new Max());
            this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
            this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg", this.metricGrpName, "The average throttle time in ms", map), new Avg());
            this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max", this.metricGrpName, "The maximum throttle time in ms", map), new Max());
        }

        public void recordTopicFetchMetrics(String str, int i, int i2) {
            String str2 = "topic." + str + ".bytes-fetched";
            Sensor sensor = this.metrics.getSensor(str2);
            if (sensor == null) {
                sensor = this.metrics.sensor(str2);
            }
            sensor.record(i);
            String str3 = "topic." + str + ".records-fetched";
            Sensor sensor2 = this.metrics.getSensor(str3);
            if (sensor2 == null) {
                sensor2 = this.metrics.sensor(str3);
            }
            sensor2.record(i2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$PartitionRecords.class */
    public static class PartitionRecords<K, V> {
        public long fetchOffset;
        public TopicPartition partition;
        public List<ConsumerRecord<K, V>> records;

        public PartitionRecords(long j, TopicPartition topicPartition, List<ConsumerRecord<K, V>> list) {
            this.fetchOffset = j;
            this.partition = topicPartition;
            this.records = list;
        }
    }

    public Fetcher(ConsumerNetworkClient consumerNetworkClient, int i, int i2, int i3, boolean z, Deserializer<K> deserializer, Deserializer<V> deserializer2, Metadata metadata, SubscriptionState subscriptionState, Metrics metrics, String str, Map<String, String> map, Time time, long j) {
        this.time = time;
        this.client = consumerNetworkClient;
        this.metadata = metadata;
        this.subscriptions = subscriptionState;
        this.minBytes = i;
        this.maxWaitMs = i2;
        this.fetchSize = i3;
        this.checkCrcs = z;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
        this.sensors = new FetchManagerMetrics(metrics, str, map);
        this.retryBackoffMs = j;
    }

    public void initFetches(Cluster cluster) {
        for (Map.Entry<Node, FetchRequest> entry : createFetchRequests(cluster).entrySet()) {
            final FetchRequest value = entry.getValue();
            this.client.send(entry.getKey(), ApiKeys.FETCH, value).addListener(new RequestFutureListener<ClientResponse>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.1
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ClientResponse clientResponse) {
                    Fetcher.this.handleFetchResponse(clientResponse, value);
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    Fetcher.log.debug("Fetch failed", runtimeException);
                }
            });
        }
    }

    public void updateFetchPositions(Set<TopicPartition> set) {
        for (TopicPartition topicPartition : set) {
            if (this.subscriptions.isAssigned(topicPartition) && !this.subscriptions.isFetchable(topicPartition)) {
                if (this.subscriptions.isOffsetResetNeeded(topicPartition)) {
                    resetOffset(topicPartition);
                } else if (this.subscriptions.committed(topicPartition) == null) {
                    this.subscriptions.needOffsetReset(topicPartition);
                    resetOffset(topicPartition);
                } else {
                    long offset = this.subscriptions.committed(topicPartition).offset();
                    log.debug("Resetting offset for partition {} to the committed offset {}", topicPartition, Long.valueOf(offset));
                    this.subscriptions.seek(topicPartition, offset);
                }
            }
        }
    }

    public Map<String, List<PartitionInfo>> getAllTopicMetadata(long j) {
        return getTopicMetadata(null, j);
    }

    public Map<String, List<PartitionInfo>> getTopicMetadata(List<String> list, long j) {
        if (list != null && list.isEmpty()) {
            return Collections.emptyMap();
        }
        long milliseconds = this.time.milliseconds();
        long j2 = j;
        do {
            RequestFuture<ClientResponse> sendMetadataRequest = sendMetadataRequest(list);
            this.client.poll(sendMetadataRequest, j2);
            if (sendMetadataRequest.failed() && !sendMetadataRequest.isRetriable()) {
                throw sendMetadataRequest.exception();
            }
            if (sendMetadataRequest.succeeded()) {
                MetadataResponse metadataResponse = new MetadataResponse(sendMetadataRequest.value().responseBody());
                Cluster cluster = metadataResponse.cluster();
                Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
                if (!unauthorizedTopics.isEmpty()) {
                    throw new TopicAuthorizationException(unauthorizedTopics);
                }
                boolean z = false;
                if (!metadataResponse.errors().isEmpty()) {
                    log.debug("Topic metadata fetch included errors: {}", metadataResponse.errors());
                    for (Map.Entry<String, Errors> entry : metadataResponse.errors().entrySet()) {
                        String key = entry.getKey();
                        Errors value = entry.getValue();
                        if (value == Errors.INVALID_TOPIC_EXCEPTION) {
                            throw new InvalidTopicException("Topic '" + key + "' is invalid");
                        }
                        if (value != Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                            if (!(value.exception() instanceof RetriableException)) {
                                throw new KafkaException("Unexpected error fetching metadata for topic " + key, value.exception());
                            }
                            z = true;
                        }
                    }
                }
                if (!z) {
                    HashMap hashMap = new HashMap();
                    for (String str : cluster.topics()) {
                        hashMap.put(str, cluster.availablePartitionsForTopic(str));
                    }
                    return hashMap;
                }
            }
            j2 = j - (this.time.milliseconds() - milliseconds);
            if (j2 > 0) {
                long min = Math.min(j2, this.retryBackoffMs);
                this.time.sleep(min);
                j2 -= min;
            }
        } while (j2 > 0);
        throw new TimeoutException("Timeout expired while fetching topic metadata");
    }

    private RequestFuture<ClientResponse> sendMetadataRequest(List<String> list) {
        if (list == null) {
            list = Collections.emptyList();
        }
        Node leastLoadedNode = this.client.leastLoadedNode();
        return leastLoadedNode == null ? RequestFuture.noBrokersAvailable() : this.client.send(leastLoadedNode, ApiKeys.METADATA, new MetadataRequest(list));
    }

    private void resetOffset(TopicPartition topicPartition) {
        long j;
        OffsetResetStrategy resetStrategy = this.subscriptions.resetStrategy(topicPartition);
        if (resetStrategy == OffsetResetStrategy.EARLIEST) {
            j = -2;
        } else {
            if (resetStrategy != OffsetResetStrategy.LATEST) {
                throw new NoOffsetForPartitionException(topicPartition);
            }
            j = -1;
        }
        log.debug("Resetting offset for partition {} to {} offset.", topicPartition, resetStrategy.name().toLowerCase());
        long listOffset = listOffset(topicPartition, j);
        if (this.subscriptions.isAssigned(topicPartition)) {
            this.subscriptions.seek(topicPartition, listOffset);
        }
    }

    private long listOffset(TopicPartition topicPartition, long j) {
        while (true) {
            RequestFuture<Long> sendListOffsetRequest = sendListOffsetRequest(topicPartition, j);
            this.client.poll(sendListOffsetRequest);
            if (sendListOffsetRequest.succeeded()) {
                return sendListOffsetRequest.value().longValue();
            }
            if (!sendListOffsetRequest.isRetriable()) {
                throw sendListOffsetRequest.exception();
            }
            if (sendListOffsetRequest.exception() instanceof InvalidMetadataException) {
                this.client.awaitMetadataUpdate();
            } else {
                this.time.sleep(this.retryBackoffMs);
            }
        }
    }

    private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : this.offsetOutOfRangePartitions.entrySet()) {
            if (this.subscriptions.isFetchable(entry.getKey())) {
                Long consumed = this.subscriptions.consumed(entry.getKey());
                if (consumed != null && entry.getValue().equals(consumed)) {
                    hashMap.put(entry.getKey(), entry.getValue());
                }
            } else {
                log.debug("Ignoring fetched records for {} since it is no longer fetchable", entry.getKey());
            }
        }
        this.offsetOutOfRangePartitions.clear();
        if (!hashMap.isEmpty()) {
            throw new OffsetOutOfRangeException(hashMap);
        }
    }

    private void throwIfUnauthorizedTopics() throws TopicAuthorizationException {
        if (this.unauthorizedTopics.isEmpty()) {
            return;
        }
        HashSet hashSet = new HashSet(this.unauthorizedTopics);
        this.unauthorizedTopics.clear();
        throw new TopicAuthorizationException(hashSet);
    }

    private void throwIfRecordTooLarge() throws RecordTooLargeException {
        HashMap hashMap = new HashMap(this.recordTooLargePartitions);
        this.recordTooLargePartitions.clear();
        if (!hashMap.isEmpty()) {
            throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + hashMap + " whose size is larger than the fetch size " + this.fetchSize + " and hence cannot be ever returned. Increase the fetch size, or decrease the maximum message size the broker will allow.", hashMap);
        }
    }

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        if (this.subscriptions.partitionAssignmentNeeded()) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        throwIfOffsetOutOfRange();
        throwIfUnauthorizedTopics();
        throwIfRecordTooLarge();
        for (PartitionRecords<K, V> partitionRecords : this.records) {
            if (this.subscriptions.isAssigned(partitionRecords.partition)) {
                long longValue = this.subscriptions.consumed(partitionRecords.partition).longValue();
                if (!this.subscriptions.isFetchable(partitionRecords.partition)) {
                    log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
                    this.subscriptions.fetched(partitionRecords.partition, longValue);
                } else if (partitionRecords.fetchOffset == longValue) {
                    long offset = partitionRecords.records.get(partitionRecords.records.size() - 1).offset() + 1;
                    log.trace("Returning fetched records for assigned partition {} and update consumed position to {}", partitionRecords.partition, Long.valueOf(offset));
                    List<ConsumerRecord<K, V>> list = hashMap.get(partitionRecords.partition);
                    if (list == null) {
                        hashMap.put(partitionRecords.partition, partitionRecords.records);
                    } else {
                        list.addAll(partitionRecords.records);
                    }
                    this.subscriptions.consumed(partitionRecords.partition, offset);
                } else {
                    log.debug("Ignoring fetched records for {} at offset {}", partitionRecords.partition, Long.valueOf(partitionRecords.fetchOffset));
                }
            } else {
                log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
            }
        }
        this.records.clear();
        return hashMap;
    }

    private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long j) {
        HashMap hashMap = new HashMap(1);
        hashMap.put(topicPartition, new ListOffsetRequest.PartitionData(j, 1));
        PartitionInfo partition = this.metadata.fetch().partition(topicPartition);
        if (partition == null) {
            this.metadata.add(topicPartition.topic());
            log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
            return RequestFuture.staleMetadata();
        }
        if (partition.leader() == null) {
            log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
            return RequestFuture.leaderNotAvailable();
        }
        return this.client.send(partition.leader(), ApiKeys.LIST_OFFSETS, new ListOffsetRequest(-1, hashMap)).compose(new RequestFutureAdapter<ClientResponse, Long>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.2
            @Override // org.apache.kafka.clients.consumer.internals.RequestFutureAdapter
            public void onSuccess(ClientResponse clientResponse, RequestFuture<Long> requestFuture) {
                Fetcher.this.handleListOffsetResponse(topicPartition, clientResponse, requestFuture);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleListOffsetResponse(TopicPartition topicPartition, ClientResponse clientResponse, RequestFuture<Long> requestFuture) {
        ListOffsetResponse listOffsetResponse = new ListOffsetResponse(clientResponse.responseBody());
        short s = listOffsetResponse.responseData().get(topicPartition).errorCode;
        if (s == Errors.NONE.code()) {
            List<Long> list = listOffsetResponse.responseData().get(topicPartition).offsets;
            if (list.size() != 1) {
                throw new IllegalStateException("This should not happen.");
            }
            long longValue = list.get(0).longValue();
            log.debug("Fetched offset {} for partition {}", Long.valueOf(longValue), topicPartition);
            requestFuture.complete(Long.valueOf(longValue));
            return;
        }
        if (s == Errors.NOT_LEADER_FOR_PARTITION.code() || s == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
            log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", topicPartition);
            requestFuture.raise(Errors.forCode(s));
        } else {
            log.error("Attempt to fetch offsets for partition {} failed due to: {}", topicPartition, Errors.forCode(s).exception().getMessage());
            requestFuture.raise(new StaleMetadataException());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v37, types: [java.util.Map] */
    /* JADX WARN: Type inference failed for: r0v49, types: [java.util.Map] */
    /* JADX WARN: Type inference failed for: r0v51, types: [java.util.HashMap] */
    private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : this.subscriptions.fetchablePartitions()) {
            Node leaderFor = cluster.leaderFor(topicPartition);
            if (leaderFor == null) {
                this.metadata.requestUpdate();
            } else if (this.client.pendingRequestCount(leaderFor) == 0) {
                V v = (Map) hashMap.get(leaderFor);
                if (v == null) {
                    v = new HashMap();
                    hashMap.put(leaderFor, v);
                }
                long longValue = this.subscriptions.fetched(topicPartition).longValue();
                if (this.subscriptions.consumed(topicPartition).longValue() == longValue) {
                    v.put(topicPartition, new FetchRequest.PartitionData(longValue, this.fetchSize));
                }
            }
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<K, V> entry : hashMap.entrySet()) {
            hashMap2.put((Node) entry.getKey(), new FetchRequest(this.maxWaitMs, this.minBytes, (Map) entry.getValue()));
        }
        return hashMap2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFetchResponse(ClientResponse clientResponse, FetchRequest fetchRequest) {
        int i = 0;
        int i2 = 0;
        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : new FetchResponse(clientResponse.responseBody()).responseData().entrySet()) {
            TopicPartition key = entry.getKey();
            FetchResponse.PartitionData value = entry.getValue();
            if (!this.subscriptions.isFetchable(key)) {
                log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", key);
            } else if (value.errorCode == Errors.NONE.code()) {
                long j = fetchRequest.fetchData().get(key).offset;
                Long consumed = this.subscriptions.consumed(key);
                if (consumed != null) {
                    if (consumed.longValue() != j) {
                        this.subscriptions.fetched(key, consumed.longValue());
                    } else {
                        int i3 = 0;
                        ByteBuffer byteBuffer = value.recordSet;
                        MemoryRecords readableRecords = MemoryRecords.readableRecords(byteBuffer);
                        ArrayList arrayList = new ArrayList();
                        Iterator<LogEntry> it = readableRecords.iterator();
                        while (it.hasNext()) {
                            LogEntry next = it.next();
                            arrayList.add(parseRecord(key, next));
                            i3 += next.size();
                        }
                        if (!arrayList.isEmpty()) {
                            this.subscriptions.fetched(key, ((ConsumerRecord) arrayList.get(arrayList.size() - 1)).offset() + 1);
                            this.records.add(new PartitionRecords<>(j, key, arrayList));
                            this.sensors.recordsFetchLag.record(value.highWatermark - r0.offset());
                        } else if (byteBuffer.limit() > 0) {
                            this.recordTooLargePartitions.put(key, Long.valueOf(j));
                        }
                        this.sensors.recordTopicFetchMetrics(key.topic(), i3, arrayList.size());
                        i += i3;
                        i2 += arrayList.size();
                    }
                }
            } else if (value.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() || value.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                this.metadata.requestUpdate();
            } else if (value.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
                long j2 = fetchRequest.fetchData().get(key).offset;
                if (this.subscriptions.hasDefaultOffsetResetPolicy()) {
                    this.subscriptions.needOffsetReset(key);
                } else {
                    this.offsetOutOfRangePartitions.put(key, Long.valueOf(j2));
                }
                log.info("Fetch offset {} is out of range, resetting offset", this.subscriptions.fetched(key));
            } else if (value.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
                log.warn("Not authorized to read from topic {}.", key.topic());
                this.unauthorizedTopics.add(key.topic());
            } else {
                if (value.errorCode != Errors.UNKNOWN.code()) {
                    throw new IllegalStateException("Unexpected error code " + ((int) value.errorCode) + " while fetching data");
                }
                log.warn("Unknown error fetching data for topic-partition {}", key);
            }
        }
        this.sensors.bytesFetched.record(i);
        this.sensors.recordsFetched.record(i2);
        this.sensors.fetchThrottleTimeSensor.record(r0.getThrottleTime());
        this.sensors.fetchLatency.record(clientResponse.requestLatencyMs());
    }

    private ConsumerRecord<K, V> parseRecord(TopicPartition topicPartition, LogEntry logEntry) {
        try {
            if (this.checkCrcs) {
                logEntry.record().ensureValid();
            }
            long offset = logEntry.offset();
            ByteBuffer key = logEntry.record().key();
            K deserialize = key == null ? null : this.keyDeserializer.deserialize(topicPartition.topic(), Utils.toArray(key));
            ByteBuffer value = logEntry.record().value();
            return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), offset, deserialize, value == null ? null : this.valueDeserializer.deserialize(topicPartition.topic(), Utils.toArray(value)));
        } catch (KafkaException e) {
            throw e;
        } catch (RuntimeException e2) {
            throw new KafkaException("Error deserializing key/value for partition " + topicPartition + " at offset " + logEntry.offset(), e2);
        }
    }
}
