/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kudu.shaded.io.micrometer.core.instrument.binder.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kudu.shaded.io.micrometer.core.annotation.Incubating;
import org.apache.kudu.shaded.io.micrometer.core.instrument.FunctionCounter;
import org.apache.kudu.shaded.io.micrometer.core.instrument.Gauge;
import org.apache.kudu.shaded.io.micrometer.core.instrument.Meter;
import org.apache.kudu.shaded.io.micrometer.core.instrument.MeterRegistry;
import org.apache.kudu.shaded.io.micrometer.core.instrument.Tag;
import org.apache.kudu.shaded.io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.kudu.shaded.io.micrometer.core.instrument.util.NamedThreadFactory;
import org.apache.kudu.shaded.io.micrometer.core.lang.NonNullApi;
import org.apache.kudu.shaded.io.micrometer.core.lang.NonNullFields;
import org.apache.kudu.shaded.io.micrometer.core.lang.Nullable;
import org.apache.kudu.shaded.io.micrometer.core.util.internal.logging.InternalLogger;
import org.apache.kudu.shaded.io.micrometer.core.util.internal.logging.InternalLoggerFactory;
import org.apache.kudu.shaded.io.micrometer.core.util.internal.logging.WarnThenDebugLogger;

@NonNullApi
@NonNullFields
@Incubating(since="1.4.0")
class KafkaMetrics
implements MeterBinder,
AutoCloseable {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(KafkaMetrics.class);
    private static final WarnThenDebugLogger warnThenDebugLogger = new WarnThenDebugLogger(KafkaMetrics.class);
    static final String METRIC_NAME_PREFIX = "kafka.";
    static final String METRIC_GROUP_APP_INFO = "app-info";
    static final String METRIC_GROUP_METRICS_COUNT = "kafka-metrics-count";
    static final String VERSION_METRIC_NAME = "version";
    static final String START_TIME_METRIC_NAME = "start-time-ms";
    static final Duration DEFAULT_REFRESH_INTERVAL = Duration.ofSeconds(60L);
    static final String KAFKA_VERSION_TAG_NAME = "kafka.version";
    static final String DEFAULT_VALUE = "unknown";
    private final Supplier<Map<MetricName, ? extends Metric>> metricsSupplier;
    private final Iterable<Tag> extraTags;
    private final Duration refreshInterval;
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("micrometer-kafka-metrics"));
    @Nullable
    private Iterable<Tag> commonTags;
    private volatile Set<MetricName> currentMeters = new HashSet<MetricName>();
    private String kafkaVersion = "unknown";

    KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier) {
        this(metricsSupplier, Collections.emptyList());
    }

    KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags) {
        this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL);
    }

    KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags, Duration refreshInterval) {
        this.metricsSupplier = metricsSupplier;
        this.extraTags = extraTags;
        this.refreshInterval = refreshInterval;
    }

    @Override
    public void bindTo(MeterRegistry registry) {
        this.commonTags = this.getCommonTags(registry);
        this.prepareToBindMetrics(registry);
        this.checkAndBindMetrics(registry);
        this.scheduler.scheduleAtFixedRate(() -> this.checkAndBindMetrics(registry), this.getRefreshIntervalInMillis(), this.getRefreshIntervalInMillis(), TimeUnit.MILLISECONDS);
    }

    private Iterable<Tag> getCommonTags(MeterRegistry registry) {
        Meter.Id dummyId = Meter.builder("delete.this", Meter.Type.OTHER, Collections.emptyList()).register(registry).getId();
        registry.remove(dummyId);
        return dummyId.getTags();
    }

    void prepareToBindMetrics(MeterRegistry registry) {
        Map<MetricName, ? extends Metric> metrics = this.metricsSupplier.get();
        Metric startTime = null;
        for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
            MetricName name = entry.getKey();
            if (!METRIC_GROUP_APP_INFO.equals(name.group())) continue;
            if (VERSION_METRIC_NAME.equals(name.name())) {
                this.kafkaVersion = (String)entry.getValue().metricValue();
                continue;
            }
            if (!START_TIME_METRIC_NAME.equals(name.name())) continue;
            startTime = entry.getValue();
        }
        if (startTime != null) {
            this.bindMeter(registry, startTime, this.meterName(startTime), this.meterTags(startTime));
        }
    }

    private long getRefreshIntervalInMillis() {
        return this.refreshInterval.toMillis();
    }

    void checkAndBindMetrics(MeterRegistry registry) {
        Map<MetricName, ? extends Metric> metrics = this.metricsSupplier.get();
        if (!this.currentMeters.equals(metrics.keySet())) {
            this.currentMeters = new HashSet<MetricName>(metrics.keySet());
            metrics.forEach((name, metric) -> {
                if (!(metric.metricValue() instanceof Number) || METRIC_GROUP_APP_INFO.equals(name.group()) || METRIC_GROUP_METRICS_COUNT.equals(name.group())) {
                    return;
                }
                String meterName = this.meterName((Metric)metric);
                boolean hasLessTags = false;
                for (Meter other : registry.find(meterName).meters()) {
                    List<Tag> tags = other.getId().getTags();
                    List<Tag> meterTagsWithCommonTags = this.meterTags((Metric)metric, true);
                    if (tags.size() < meterTagsWithCommonTags.size()) {
                        registry.remove(other);
                        continue;
                    }
                    if (tags.size() == meterTagsWithCommonTags.size()) {
                        if (!tags.containsAll(meterTagsWithCommonTags)) break;
                        return;
                    }
                    hasLessTags = true;
                }
                if (hasLessTags) {
                    return;
                }
                List<Tag> tags = this.meterTags((Metric)metric);
                try {
                    this.bindMeter(registry, (Metric)metric, meterName, (Iterable<Tag>)tags);
                }
                catch (Exception ex) {
                    String message = ex.getMessage();
                    if (message != null && message.contains("Prometheus requires")) {
                        warnThenDebugLogger.log("Failed to bind meter: " + meterName + " " + tags + ". However, this could happen and might be restored in the next refresh.");
                    }
                    log.warn("Failed to bind meter: " + meterName + " " + tags + ".", ex);
                }
            });
        }
    }

    private void bindMeter(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
        if (name.endsWith("total") || name.endsWith("count")) {
            this.registerCounter(registry, metric, name, tags);
        } else {
            this.registerGauge(registry, metric, name, tags);
        }
    }

    private void registerGauge(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
        Gauge.builder(name, metric, KafkaMetrics.toMetricValue()).tags(tags).description(metric.metricName().description()).register(registry);
    }

    private void registerCounter(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
        FunctionCounter.builder(name, metric, KafkaMetrics.toMetricValue()).tags(tags).description(metric.metricName().description()).register(registry);
    }

    private static ToDoubleFunction<Metric> toMetricValue() {
        return metric -> ((Number)metric.metricValue()).doubleValue();
    }

    private List<Tag> meterTags(Metric metric, boolean includeCommonTags) {
        ArrayList<Tag> tags = new ArrayList<Tag>();
        metric.metricName().tags().forEach((key, value) -> tags.add(Tag.of(key.replaceAll("-", "."), value)));
        tags.add(Tag.of(KAFKA_VERSION_TAG_NAME, this.kafkaVersion));
        this.extraTags.forEach(tags::add);
        if (includeCommonTags) {
            this.commonTags.forEach(tags::add);
        }
        return tags;
    }

    private List<Tag> meterTags(Metric metric) {
        return this.meterTags(metric, false);
    }

    private String meterName(Metric metric) {
        String name = METRIC_NAME_PREFIX + metric.metricName().group() + "." + metric.metricName().name();
        return name.replaceAll("-metrics", "").replaceAll("-", ".");
    }

    @Override
    public void close() {
        this.scheduler.shutdownNow();
    }
}

