/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader.deserializer;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.GenericRecordDeserializer;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.common.naming.TopicName;

@Internal
public class GenericRecordDeserializationSchema<T>
implements PulsarDeserializationSchema<T> {
    private static final long serialVersionUID = 1133225716807307498L;
    private transient PulsarClientImpl client;
    private transient Map<String, AutoConsumeSchema> schemaMap;
    private final GenericRecordDeserializer<T> deserializer;

    public GenericRecordDeserializationSchema(GenericRecordDeserializer<T> deserializer) {
        this.deserializer = deserializer;
    }

    @Override
    public void deserialize(Message<byte[]> message, Collector<T> out) throws Exception {
        AutoConsumeSchema schema = this.getSchema(message);
        GenericRecord element = schema.decode(message.getData(), message.getSchemaVersion());
        T msg = this.deserializer.deserialize(element);
        out.collect(msg);
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    @Override
    public void open(PulsarDeserializationSchema.PulsarInitializationContext context, SourceConfiguration configuration) throws Exception {
        this.client = (PulsarClientImpl)context.getPulsarClient();
        this.schemaMap = new HashMap<String, AutoConsumeSchema>();
    }

    private AutoConsumeSchema getSchema(Message<byte[]> message) {
        String topic = message.getTopicName();
        AutoConsumeSchema schema = this.schemaMap.get(topic);
        if (schema != null) {
            return schema;
        }
        schema = new AutoConsumeSchema();
        TopicName topicName = TopicName.get(topic);
        MultiVersionSchemaInfoProvider provider = new MultiVersionSchemaInfoProvider(topicName, this.client);
        schema.setSchemaInfoProvider(provider);
        this.schemaMap.put(topic, schema);
        return schema;
    }
}

