/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.table;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.table.PulsarTableOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;

public class PulsarTableOptionUtils {
    public static final String TOPIC_LIST_DELIMITER = ";";

    private PulsarTableOptionUtils() {
    }

    @Nullable
    public static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat(FactoryUtil.TableFactoryHelper helper) {
        return helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, PulsarTableOptions.KEY_FORMAT).orElse(null);
    }

    @Nullable
    public static EncodingFormat<SerializationSchema<RowData>> getKeyEncodingFormat(FactoryUtil.TableFactoryHelper helper) {
        return helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, PulsarTableOptions.KEY_FORMAT).orElse(null);
    }

    public static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(FactoryUtil.TableFactoryHelper helper) {
        return helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, FactoryUtil.FORMAT).orElseGet(() -> helper.discoverDecodingFormat(DeserializationFormatFactory.class, PulsarTableOptions.VALUE_FORMAT));
    }

    public static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(FactoryUtil.TableFactoryHelper helper) {
        return helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FactoryUtil.FORMAT).orElseGet(() -> helper.discoverEncodingFormat(SerializationFormatFactory.class, PulsarTableOptions.VALUE_FORMAT));
    }

    public static int[] createKeyFormatProjection(ReadableConfig options, DataType physicalDataType) {
        LogicalType physicalType = physicalDataType.getLogicalType();
        Preconditions.checkArgument((boolean)physicalType.is(LogicalTypeRoot.ROW), (Object)"Row data type expected.");
        Optional optionalKeyFormat = options.getOptional(PulsarTableOptions.KEY_FORMAT);
        Optional optionalKeyFields = options.getOptional(PulsarTableOptions.KEY_FIELDS);
        if (!optionalKeyFormat.isPresent()) {
            return new int[0];
        }
        List keyFields = (List)optionalKeyFields.get();
        List physicalFields = LogicalTypeChecks.getFieldNames((LogicalType)physicalType);
        return keyFields.stream().mapToInt(keyField -> {
            int pos = physicalFields.indexOf(keyField);
            if (pos < 0) {
                throw new ValidationException(String.format("Could not find the field '%s' in the table schema for usage in the key format. A key field must be a regular, physical column. The following columns can be selected in the '%s' option: %s", keyField, PulsarTableOptions.KEY_FIELDS.key(), physicalFields));
            }
            return pos;
        }).toArray();
    }

    public static int[] createValueFormatProjection(ReadableConfig options, DataType physicalDataType) {
        LogicalType physicalType = physicalDataType.getLogicalType();
        Preconditions.checkArgument((boolean)physicalType.is(LogicalTypeRoot.ROW), (Object)"Row data type expected.");
        int physicalFieldCount = LogicalTypeChecks.getFieldCount((LogicalType)physicalType);
        IntStream physicalFields = IntStream.range(0, physicalFieldCount);
        int[] keyProjection = PulsarTableOptionUtils.createKeyFormatProjection(options, physicalDataType);
        return physicalFields.filter(pos -> IntStream.of(keyProjection).noneMatch(k -> k == pos)).toArray();
    }

    public static List<String> getTopicListFromOptions(ReadableConfig tableOptions) {
        return (List)tableOptions.get(PulsarTableOptions.TOPICS);
    }

    public static Properties getPulsarProperties(ReadableConfig tableOptions) {
        Map configs = ((Configuration)tableOptions).toMap();
        return PulsarTableOptionUtils.getPulsarProperties(configs);
    }

    public static Properties getPulsarProperties(Map<String, String> configs) {
        return PulsarTableOptionUtils.getPulsarPropertiesWithPrefix(configs, "pulsar");
    }

    public static Properties getPulsarPropertiesWithPrefix(ReadableConfig tableOptions, String prefix) {
        Map configs = ((Configuration)tableOptions).toMap();
        return PulsarTableOptionUtils.getPulsarPropertiesWithPrefix(configs, prefix);
    }

    public static Properties getPulsarPropertiesWithPrefix(Map<String, String> configs, String prefix) {
        Properties pulsarProperties = new Properties();
        configs.keySet().stream().filter(key -> key.startsWith(prefix)).forEach(key -> pulsarProperties.put(key, configs.get(key)));
        return pulsarProperties;
    }

    public static StartCursor getStartCursor(ReadableConfig tableOptions) {
        if (tableOptions.getOptional(PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID).isPresent()) {
            return PulsarTableOptionUtils.parseMessageIdStartCursor((String)tableOptions.get(PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID));
        }
        if (tableOptions.getOptional(PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
            return PulsarTableOptionUtils.parsePublishTimeStartCursor((Long)tableOptions.get(PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME));
        }
        return StartCursor.earliest();
    }

    public static StopCursor getStopCursor(ReadableConfig tableOptions) {
        if (tableOptions.getOptional(PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID).isPresent()) {
            return PulsarTableOptionUtils.parseAtMessageIdStopCursor((String)tableOptions.get(PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID));
        }
        if (tableOptions.getOptional(PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID).isPresent()) {
            return PulsarTableOptionUtils.parseAfterMessageIdStopCursor((String)tableOptions.get(PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID));
        }
        if (tableOptions.getOptional(PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME).isPresent()) {
            return PulsarTableOptionUtils.parseAtPublishTimeStopCursor((Long)tableOptions.get(PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME));
        }
        return StopCursor.never();
    }

    public static SubscriptionType getSubscriptionType(ReadableConfig tableOptions) {
        return (SubscriptionType)((Object)tableOptions.get(PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE));
    }

    protected static StartCursor parseMessageIdStartCursor(String config) {
        if (Objects.equals(config, "earliest")) {
            return StartCursor.earliest();
        }
        if (Objects.equals(config, "latest")) {
            return StartCursor.latest();
        }
        return StartCursor.fromMessageId(PulsarTableOptionUtils.parseMessageIdString(config));
    }

    protected static StartCursor parsePublishTimeStartCursor(Long config) {
        return StartCursor.fromPublishTime(config);
    }

    protected static StopCursor parseAtMessageIdStopCursor(String config) {
        if (Objects.equals(config, "never")) {
            return StopCursor.never();
        }
        if (Objects.equals(config, "latest")) {
            return StopCursor.latest();
        }
        return StopCursor.atMessageId(PulsarTableOptionUtils.parseMessageIdString(config));
    }

    protected static StopCursor parseAfterMessageIdStopCursor(String config) {
        return StopCursor.afterMessageId(PulsarTableOptionUtils.parseMessageIdString(config));
    }

    protected static StopCursor parseAtPublishTimeStopCursor(Long config) {
        return StopCursor.atPublishTime(config);
    }

    protected static MessageIdImpl parseMessageIdString(String config) {
        String[] tokens = config.split(":", 3);
        Preconditions.checkArgument((tokens.length == 3 ? 1 : 0) != 0, (Object)"MessageId format must be ledgerId:entryId:partitionId.");
        try {
            long ledgerId = Long.parseLong(tokens[0]);
            long entryId = Long.parseLong(tokens[1]);
            int partitionId = Integer.parseInt(tokens[2]);
            return new MessageIdImpl(ledgerId, entryId, partitionId);
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("MessageId format must be ledgerId:entryId:partitionId. Each id should be able to parsed to long type.");
        }
    }

    public static TopicRouter<RowData> getTopicRouter(ReadableConfig readableConfig, ClassLoader classLoader) {
        if (!readableConfig.getOptional(PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER).isPresent()) {
            return null;
        }
        String className = (String)readableConfig.get(PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER);
        try {
            Class<?> clazz = Class.forName(className, true, classLoader);
            if (!TopicRouter.class.isAssignableFrom(clazz)) {
                throw new ValidationException(String.format("Sink TopicRouter class '%s' should extend from the required class %s", className, TopicRouter.class.getName()));
            }
            TopicRouter topicRouter = (TopicRouter)InstantiationUtil.instantiate((String)className, TopicRouter.class, (ClassLoader)classLoader);
            return topicRouter;
        }
        catch (ClassNotFoundException | FlinkException e) {
            throw new ValidationException(String.format("Could not find and instantiate TopicRouter class '%s'", className), e);
        }
    }

    public static TopicRoutingMode getTopicRoutingMode(ReadableConfig readableConfig) {
        return (TopicRoutingMode)((Object)readableConfig.get(PulsarTableOptions.SINK_TOPIC_ROUTING_MODE));
    }

    public static long getMessageDelayMillis(ReadableConfig readableConfig) {
        return ((Duration)readableConfig.get(PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL)).toMillis();
    }
}

