/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka.consumer.support.resume;

import java.nio.ByteBuffer;
import org.apache.camel.resume.Cacheable;
import org.apache.camel.resume.Deserializable;
import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.spi.annotations.JdkService;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JdkService(value="kafka-adapter-factory")
public class KafkaResumeAdapter
implements ResumeAdapter,
Deserializable,
Cacheable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeAdapter.class);
    private Consumer<?, ?> consumer;
    private ResumeCache<TopicPartition> resumeCache;

    private boolean resume(TopicPartition topicPartition, Object value) {
        this.consumer.seek(topicPartition, (Long)value);
        return true;
    }

    @Override
    public void resume() {
        this.resumeCache.forEach(this::resume);
    }

    @Override
    public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
        Object keyObj = this.deserializeKey(keyBuffer);
        Object valueObj = this.deserializeValue(valueBuffer);
        if (keyObj instanceof String) {
            String key = (String)keyObj;
            String[] keyParts = key.split("/");
            if (keyParts == null || keyParts.length != 2) {
                String topic = keyParts[0];
                int partition = Integer.parseInt(keyParts[1]);
                if (valueObj instanceof Long) {
                    Long offset = (Long)valueObj;
                    this.resumeCache.add(new TopicPartition(topic, partition), offset);
                } else {
                    LOG.warn("The type for the key '{}' is invalid: {}", (Object)key, valueObj);
                }
            } else {
                LOG.warn("Unable to deserialize key '{}' because it has in invalid format and it will be discarded", (Object)key);
            }
        } else {
            LOG.warn("Unable to deserialize key '{}' because its type is invalid", keyObj);
        }
        return false;
    }

    @Override
    public boolean add(OffsetKey<?> key, Offset<?> offset) {
        Object keyObj = key.getValue();
        Long valueObject = offset.getValue(Long.class);
        if (keyObj instanceof TopicPartition) {
            TopicPartition topicPartition = (TopicPartition)keyObj;
            this.resumeCache.add(topicPartition, valueObject);
        }
        return true;
    }

    @Override
    public void setCache(ResumeCache<?> cache) {
        this.resumeCache = cache;
    }

    @Override
    public ResumeCache<?> getCache() {
        return this.resumeCache;
    }

    public void setConsumer(Consumer<?, ?> consumer) {
        this.consumer = consumer;
    }
}

