package org.apache.flink.queryablestate.client;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState;
import org.apache.flink.queryablestate.client.state.ImmutableListState;
import org.apache.flink.queryablestate.client.state.ImmutableMapState;
import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
import org.apache.flink.queryablestate.client.state.ImmutableValueState;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.LambdaUtil;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/queryablestate/client/QueryableStateClient.class */
public class QueryableStateClient {
    private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = (Map) Stream.of((Object[]) new Tuple2[]{Tuple2.of(StateDescriptor.Type.VALUE, ImmutableValueState::createState), Tuple2.of(StateDescriptor.Type.LIST, ImmutableListState::createState), Tuple2.of(StateDescriptor.Type.MAP, ImmutableMapState::createState), Tuple2.of(StateDescriptor.Type.AGGREGATING, ImmutableAggregatingState::createState), Tuple2.of(StateDescriptor.Type.REDUCING, ImmutableReducingState::createState)}).collect(Collectors.toMap(tuple2 -> {
        return (StateDescriptor.Type) tuple2.f0;
    }, tuple22 -> {
        return (StateFactory) tuple22.f1;
    }));
    private final Client<KvStateRequest, KvStateResponse> client;
    private final InetSocketAddress remoteAddress;
    private ExecutionConfig executionConfig;
    private ClassLoader userClassLoader;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/queryablestate/client/QueryableStateClient$StateFactory.class */
    public interface StateFactory {
        <T, S extends State> S createState(StateDescriptor<S, T> stateDescriptor, byte[] bArr) throws Exception;
    }

    public QueryableStateClient(String str, int i) throws UnknownHostException {
        this(InetAddress.getByName((String) Preconditions.checkNotNull(str)), i);
    }

    public QueryableStateClient(InetAddress inetAddress, int i) {
        Preconditions.checkArgument(NetUtils.isValidHostPort(i), "Remote Port " + i + " is out of valid port range [0-65535].");
        this.remoteAddress = new InetSocketAddress(inetAddress, i);
        this.client = new Client<>("Queryable State Client", 1, new MessageSerializer(new KvStateRequest.KvStateRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), new DisabledKvStateRequestStats());
    }

    public CompletableFuture<?> shutdownAndHandle() {
        return this.client.shutdown();
    }

    public void shutdownAndWait() {
        try {
            this.client.shutdown().get();
            LOG.info("The Queryable State Client was shutdown successfully.");
        } catch (Exception e) {
            LOG.warn("The Queryable State Client shutdown failed: ", e);
        }
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public ExecutionConfig setExecutionConfig(ExecutionConfig executionConfig) {
        ExecutionConfig executionConfig2 = this.executionConfig;
        this.executionConfig = executionConfig;
        return executionConfig2;
    }

    public ClassLoader setUserClassLoader(ClassLoader classLoader) {
        ClassLoader classLoader2 = this.userClassLoader;
        this.userClassLoader = classLoader;
        return classLoader2;
    }

    @PublicEvolving
    public <K, S extends State, V> CompletableFuture<S> getKvState(JobID jobID, String str, K k, TypeHint<K> typeHint, StateDescriptor<S, V> stateDescriptor) {
        Preconditions.checkNotNull(typeHint);
        return getKvState(jobID, str, (String) k, (TypeInformation<String>) typeHint.getTypeInfo(), (StateDescriptor) stateDescriptor);
    }

    @PublicEvolving
    public <K, S extends State, V> CompletableFuture<S> getKvState(JobID jobID, String str, K k, TypeInformation<K> typeInformation, StateDescriptor<S, V> stateDescriptor) {
        return getKvState(jobID, str, k, VoidNamespace.INSTANCE, typeInformation, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
    }

    private <K, N, S extends State, V> CompletableFuture<S> getKvState(JobID jobID, String str, K k, N n, TypeInformation<K> typeInformation, TypeInformation<N> typeInformation2, StateDescriptor<S, V> stateDescriptor) {
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(k);
        Preconditions.checkNotNull(n);
        Preconditions.checkNotNull(typeInformation);
        Preconditions.checkNotNull(typeInformation2);
        Preconditions.checkNotNull(stateDescriptor);
        TypeSerializer<K> createSerializer = typeInformation.createSerializer(this.executionConfig);
        TypeSerializer<N> createSerializer2 = typeInformation2.createSerializer(this.executionConfig);
        stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
        try {
            byte[] serializeKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(k, createSerializer, n, createSerializer2);
            ClassLoader contextClassLoader = this.userClassLoader != null ? this.userClassLoader : Thread.currentThread().getContextClassLoader();
            return (CompletableFuture<S>) getKvState(jobID, str, k.hashCode(), serializeKeyAndNamespace).thenApply(kvStateResponse -> {
                return (State) LambdaUtil.withContextClassLoader(contextClassLoader, () -> {
                    return createState(kvStateResponse, stateDescriptor);
                });
            });
        } catch (IOException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    private <T, S extends State> S createState(KvStateResponse kvStateResponse, StateDescriptor<S, T> stateDescriptor) {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getType());
        if (stateFactory == null) {
            throw new FlinkRuntimeException(String.format("State %s is not supported by %s", stateDescriptor.getClass(), getClass()));
        }
        try {
            return (S) stateFactory.createState(stateDescriptor, kvStateResponse.getContent());
        } catch (Exception e) {
            throw new FlinkRuntimeException(e);
        }
    }

    private CompletableFuture<KvStateResponse> getKvState(JobID jobID, String str, int i, byte[] bArr) {
        LOG.debug("Sending State Request to {}.", this.remoteAddress);
        try {
            return this.client.sendRequest(this.remoteAddress, new KvStateRequest(jobID, str, i, bArr));
        } catch (Exception e) {
            LOG.error("Unable to send KVStateRequest: ", e);
            return FutureUtils.completedExceptionally(e);
        }
    }
}
