/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Route;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.KafkaFetchRecords;
import org.apache.camel.component.kafka.TaskHealthState;
import org.apache.camel.component.kafka.consumer.devconsole.DevConsoleMetricsCollector;
import org.apache.camel.spi.annotations.DevConsole;
import org.apache.camel.support.console.AbstractDevConsole;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.TimeUtils;
import org.apache.camel.util.json.JsonArray;
import org.apache.camel.util.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DevConsole(name="kafka", displayName="Kafka", description="Apache Kafka")
public class KafkaDevConsole
extends AbstractDevConsole {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaDevConsole.class);
    private static final long COMMITTED_TIMEOUT = 10000L;
    public static final String COMMITTED = "committed";

    public KafkaDevConsole() {
        super("camel", "kafka", "Kafka", "Apache Kafka");
    }

    @Override
    protected String doCallText(Map<String, Object> options) {
        boolean committed = "true".equals(options.getOrDefault(COMMITTED, "false"));
        StringBuilder sb = new StringBuilder();
        for (Route route : this.getCamelContext().getRoutes()) {
            Consumer consumer = route.getConsumer();
            if (!(consumer instanceof KafkaConsumer)) continue;
            KafkaConsumer kc = (KafkaConsumer)consumer;
            sb.append(String.format("\n    Route Id: %s", route.getRouteId()));
            sb.append(String.format("\n    From: %s", route.getEndpoint().getEndpointUri()));
            for (KafkaFetchRecords t : kc.tasks()) {
                List<DevConsoleMetricsCollector.KafkaTopicPosition> l;
                DevConsoleMetricsCollector.GroupMetadata meta;
                DevConsoleMetricsCollector metricsCollector = t.getMetricsCollector();
                sb.append(String.format("\n        Worked Thread: %s", metricsCollector.getThreadId()));
                sb.append(String.format("\n        Worker State: %s", t.getState()));
                TaskHealthState hs = t.healthState();
                if (!hs.isReady()) {
                    sb.append(String.format("\n        Worker Last Error: %s", hs.buildStateMessage()));
                }
                if ((meta = metricsCollector.getGroupMetadata()) != null) {
                    sb.append(String.format("\n        Group Id: %s", meta.groupId()));
                    sb.append(String.format("\n        Group Instance Id: %s", meta.groupInstanceId()));
                    sb.append(String.format("\n        Member Id: %s", meta.memberId()));
                    sb.append(String.format("\n        Generation Id: %d", meta.generationId()));
                }
                if (metricsCollector.getLastRecord() != null) {
                    sb.append(String.format("\n        Last Topic: %s", metricsCollector.getLastRecord().topic()));
                    sb.append(String.format("\n        Last Partition: %d", metricsCollector.getLastRecord().partition()));
                    sb.append(String.format("\n        Last Offset: %d", metricsCollector.getLastRecord().offset()));
                }
                if (!committed || (l = KafkaDevConsole.fetchCommitOffsets(kc, metricsCollector)) == null) continue;
                for (DevConsoleMetricsCollector.KafkaTopicPosition r : l) {
                    sb.append(String.format("\n        Commit Topic: %s", r.topic()));
                    sb.append(String.format("\n        Commit Partition: %s", r.partition()));
                    sb.append(String.format("\n        Commit Offset: %s", r.offset()));
                    if (r.epoch() <= 0) continue;
                    long delta = System.currentTimeMillis() - (long)r.epoch();
                    sb.append(String.format("\n        Commit Offset Since: %s", TimeUtils.printDuration(delta, true)));
                }
            }
            sb.append("\n");
        }
        return sb.toString();
    }

    private static List<DevConsoleMetricsCollector.KafkaTopicPosition> fetchCommitOffsets(KafkaConsumer kc, DevConsoleMetricsCollector collector) {
        StopWatch watch = new StopWatch();
        CountDownLatch latch = collector.fetchCommitRecords();
        long timeout = Math.min(kc.getEndpoint().getConfiguration().getPollTimeoutMs(), 10000L);
        try {
            latch.await(timeout, TimeUnit.MILLISECONDS);
            List<DevConsoleMetricsCollector.KafkaTopicPosition> answer = collector.getCommitRecords();
            LOG.debug("Fetching commit offsets took: {} ms", (Object)watch.taken());
            return answer;
        }
        catch (Exception exception) {
            return null;
        }
    }

    @Override
    protected Map<String, Object> doCallJson(Map<String, Object> options) {
        boolean committed = "true".equals(options.getOrDefault(COMMITTED, "false"));
        JsonObject root = new JsonObject();
        ArrayList<JsonObject> list = new ArrayList<JsonObject>();
        root.put("kafkaConsumers", list);
        for (Route route : this.getCamelContext().getRoutes()) {
            Consumer consumer = route.getConsumer();
            if (!(consumer instanceof KafkaConsumer)) continue;
            KafkaConsumer kc = (KafkaConsumer)consumer;
            JsonObject jo = new JsonObject();
            jo.put("routeId", route.getRouteId());
            jo.put("uri", route.getEndpoint().getEndpointUri());
            JsonArray arr = new JsonArray();
            jo.put("workers", arr);
            for (KafkaFetchRecords t : kc.tasks()) {
                List<DevConsoleMetricsCollector.KafkaTopicPosition> l;
                DevConsoleMetricsCollector.GroupMetadata meta;
                DevConsoleMetricsCollector metricsCollector = t.getMetricsCollector();
                JsonObject wo = new JsonObject();
                arr.add(wo);
                wo.put("threadId", metricsCollector.getThreadId());
                wo.put("state", t.getState());
                TaskHealthState hs = t.healthState();
                if (!hs.isReady()) {
                    wo.put("lastError", hs.buildStateMessage());
                }
                if ((meta = metricsCollector.getGroupMetadata()) != null) {
                    wo.put("groupId", meta.groupId());
                    wo.put("groupInstanceId", meta.groupInstanceId());
                    wo.put("memberId", meta.memberId());
                    wo.put("generationId", meta.generationId());
                }
                if (metricsCollector.getLastRecord() != null) {
                    wo.put("lastTopic", metricsCollector.getLastRecord().topic());
                    wo.put("lastPartition", metricsCollector.getLastRecord().partition());
                    wo.put("lastOffset", metricsCollector.getLastRecord().offset());
                }
                if (!committed || (l = KafkaDevConsole.fetchCommitOffsets(kc, metricsCollector)) == null) continue;
                JsonArray ca = new JsonArray();
                for (DevConsoleMetricsCollector.KafkaTopicPosition r : l) {
                    JsonObject cr = new JsonObject();
                    cr.put("topic", r.topic());
                    cr.put("partition", r.partition());
                    cr.put("offset", r.offset());
                    cr.put("epoch", r.epoch());
                    ca.add(cr);
                }
                if (ca.isEmpty()) continue;
                wo.put(COMMITTED, ca);
            }
            list.add(jo);
        }
        return root;
    }
}

