package org.apache.flink.runtime.query.netty;

import akka.dispatch.Futures;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import scala.concurrent.Future;
import scala.concurrent.Promise;

/* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateClient.class */
public class KvStateClient {
    private final Bootstrap bootstrap;
    private final KvStateRequestStats stats;
    private final ConcurrentHashMap<KvStateServerAddress, EstablishedConnection> establishedConnections = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<KvStateServerAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<>();
    private final AtomicBoolean shutDown = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateClient$EstablishedConnection.class */
    public class EstablishedConnection implements KvStateClientHandlerCallback {
        private final KvStateServerAddress serverAddress;
        private final Channel channel;
        private final ConcurrentHashMap<Long, PromiseAndTimestamp> pendingRequests = new ConcurrentHashMap<>();
        private final AtomicLong requestCount = new AtomicLong();
        private final AtomicReference<Throwable> failureCause = new AtomicReference<>();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateClient$EstablishedConnection$PromiseAndTimestamp.class */
        public class PromiseAndTimestamp {
            private final Promise<byte[]> promise;
            private final long timestamp;

            public PromiseAndTimestamp(Promise<byte[]> promise, long j) {
                this.promise = promise;
                this.timestamp = j;
            }
        }

        EstablishedConnection(KvStateServerAddress kvStateServerAddress, Channel channel) {
            this.serverAddress = (KvStateServerAddress) Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
            this.channel = (Channel) Preconditions.checkNotNull(channel, "Channel");
            channel.pipeline().addLast("KvStateClientHandler", new KvStateClientHandler(this));
            KvStateClient.this.stats.reportActiveConnection();
        }

        void close() {
            close(new ClosedChannelException());
        }

        private boolean close(Throwable th) {
            if (!this.failureCause.compareAndSet(null, th)) {
                return false;
            }
            this.channel.close();
            KvStateClient.this.stats.reportInactiveConnection();
            Iterator<Long> it = this.pendingRequests.keySet().iterator();
            while (it.hasNext()) {
                PromiseAndTimestamp remove = this.pendingRequests.remove(Long.valueOf(it.next().longValue()));
                if (remove != null && remove.promise.tryFailure(th)) {
                    KvStateClient.this.stats.reportFailedRequest();
                }
            }
            return true;
        }

        Future<byte[]> getKvState(KvStateID kvStateID, byte[] bArr) {
            PromiseAndTimestamp remove;
            PromiseAndTimestamp promiseAndTimestamp = new PromiseAndTimestamp(Futures.promise(), System.nanoTime());
            try {
                final long andIncrement = this.requestCount.getAndIncrement();
                this.pendingRequests.put(Long.valueOf(andIncrement), promiseAndTimestamp);
                KvStateClient.this.stats.reportRequest();
                this.channel.writeAndFlush(KvStateRequestSerializer.serializeKvStateRequest(this.channel.alloc(), andIncrement, kvStateID, bArr)).addListener2((GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Void>>) new ChannelFutureListener() { // from class: org.apache.flink.runtime.query.netty.KvStateClient.EstablishedConnection.1
                    @Override // io.netty.util.concurrent.GenericFutureListener
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        PromiseAndTimestamp promiseAndTimestamp2;
                        if (channelFuture.isSuccess() || (promiseAndTimestamp2 = (PromiseAndTimestamp) EstablishedConnection.this.pendingRequests.remove(Long.valueOf(andIncrement))) == null || !promiseAndTimestamp2.promise.tryFailure(channelFuture.cause())) {
                            return;
                        }
                        KvStateClient.this.stats.reportFailedRequest();
                    }
                });
                Throwable th = this.failureCause.get();
                if (th != null && (remove = this.pendingRequests.remove(Long.valueOf(andIncrement))) != null && remove.promise.tryFailure(th)) {
                    KvStateClient.this.stats.reportFailedRequest();
                }
            } catch (Throwable th2) {
                promiseAndTimestamp.promise.tryFailure(th2);
            }
            return promiseAndTimestamp.promise.future();
        }

        @Override // org.apache.flink.runtime.query.netty.KvStateClientHandlerCallback
        public void onRequestResult(long j, byte[] bArr) {
            PromiseAndTimestamp remove = this.pendingRequests.remove(Long.valueOf(j));
            if (remove == null || !remove.promise.trySuccess(bArr)) {
                return;
            }
            KvStateClient.this.stats.reportSuccessfulRequest((System.nanoTime() - remove.timestamp) / DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
        }

        @Override // org.apache.flink.runtime.query.netty.KvStateClientHandlerCallback
        public void onRequestFailure(long j, Throwable th) {
            PromiseAndTimestamp remove = this.pendingRequests.remove(Long.valueOf(j));
            if (remove == null || !remove.promise.tryFailure(th)) {
                return;
            }
            KvStateClient.this.stats.reportFailedRequest();
        }

        @Override // org.apache.flink.runtime.query.netty.KvStateClientHandlerCallback
        public void onFailure(Throwable th) {
            if (close(th)) {
                KvStateClient.this.establishedConnections.remove(this.serverAddress, this);
            }
        }

        public String toString() {
            return "EstablishedConnection{serverAddress=" + this.serverAddress + ", channel=" + this.channel + ", pendingRequests=" + this.pendingRequests.size() + ", requestCount=" + this.requestCount + ", failureCause=" + this.failureCause + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateClient$PendingConnection.class */
    public class PendingConnection implements ChannelFutureListener {
        private final Object connectLock;
        private final KvStateServerAddress serverAddress;
        private final ArrayDeque<PendingRequest> queuedRequests;
        private EstablishedConnection established;
        private boolean closed;
        private Throwable failureCause;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateClient$PendingConnection$PendingRequest.class */
        public final class PendingRequest {
            private final KvStateID kvStateId;
            private final byte[] serializedKeyAndNamespace;
            private final Promise<byte[]> promise;

            private PendingRequest(KvStateID kvStateID, byte[] bArr) {
                this.kvStateId = kvStateID;
                this.serializedKeyAndNamespace = bArr;
                this.promise = Futures.promise();
            }
        }

        private PendingConnection(KvStateServerAddress kvStateServerAddress) {
            this.connectLock = new Object();
            this.queuedRequests = new ArrayDeque<>();
            this.serverAddress = kvStateServerAddress;
        }

        @Override // io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                handInChannel(channelFuture.channel());
            } else {
                close(channelFuture.cause());
            }
        }

        public Future<byte[]> getKvState(KvStateID kvStateID, byte[] bArr) {
            synchronized (this.connectLock) {
                if (this.failureCause != null) {
                    return Futures.failed(this.failureCause);
                }
                if (this.closed) {
                    return Futures.failed(new ClosedChannelException());
                }
                if (this.established != null) {
                    return this.established.getKvState(kvStateID, bArr);
                }
                PendingRequest pendingRequest = new PendingRequest(kvStateID, bArr);
                this.queuedRequests.add(pendingRequest);
                return pendingRequest.promise.future();
            }
        }

        private void handInChannel(Channel channel) {
            synchronized (this.connectLock) {
                if (this.closed || this.failureCause != null) {
                    channel.close();
                } else {
                    this.established = new EstablishedConnection(this.serverAddress, channel);
                    while (true) {
                        PendingRequest poll = this.queuedRequests.poll();
                        if (poll == null) {
                            break;
                        }
                        poll.promise.completeWith(this.established.getKvState(poll.kvStateId, poll.serializedKeyAndNamespace));
                    }
                    KvStateClient.this.establishedConnections.put(this.serverAddress, this.established);
                    KvStateClient.this.pendingConnections.remove(this.serverAddress);
                    if (KvStateClient.this.shutDown.get() && KvStateClient.this.establishedConnections.remove(this.serverAddress, this.established)) {
                        this.established.close();
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            close(new ClosedChannelException());
        }

        private void close(Throwable th) {
            synchronized (this.connectLock) {
                if (!this.closed) {
                    if (this.failureCause == null) {
                        this.failureCause = th;
                    }
                    if (this.established == null) {
                        while (true) {
                            PendingRequest poll = this.queuedRequests.poll();
                            if (poll == null) {
                                break;
                            } else {
                                poll.promise.tryFailure(th);
                            }
                        }
                    } else {
                        this.established.close();
                    }
                    this.closed = true;
                }
            }
        }

        public String toString() {
            String str;
            synchronized (this.connectLock) {
                str = "PendingConnection{serverAddress=" + this.serverAddress + ", queuedRequests=" + this.queuedRequests.size() + ", established=" + (this.established != null) + ", closed=" + this.closed + '}';
            }
            return str;
        }
    }

    public KvStateClient(int i, KvStateRequestStats kvStateRequestStats) {
        Preconditions.checkArgument(i >= 1, "Non-positive number of event loop threads.");
        this.bootstrap = new Bootstrap().group(new NioEventLoopGroup(i, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink KvStateClient Event Loop Thread %d").build())).channel(NioSocketChannel.class).option(ChannelOption.ALLOCATOR, new NettyBufferPool(i)).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.query.netty.KvStateClient.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)).addLast(new ChunkedWriteHandler());
            }
        });
        this.stats = (KvStateRequestStats) Preconditions.checkNotNull(kvStateRequestStats, "Statistics tracker");
    }

    public Future<byte[]> getKvState(KvStateServerAddress kvStateServerAddress, KvStateID kvStateID, byte[] bArr) {
        if (this.shutDown.get()) {
            return Futures.failed(new IllegalStateException("Shut down"));
        }
        EstablishedConnection establishedConnection = this.establishedConnections.get(kvStateServerAddress);
        if (establishedConnection != null) {
            return establishedConnection.getKvState(kvStateID, bArr);
        }
        PendingConnection pendingConnection = this.pendingConnections.get(kvStateServerAddress);
        if (pendingConnection != null) {
            return pendingConnection.getKvState(kvStateID, bArr);
        }
        PendingConnection pendingConnection2 = new PendingConnection(kvStateServerAddress);
        PendingConnection putIfAbsent = this.pendingConnections.putIfAbsent(kvStateServerAddress, pendingConnection2);
        if (putIfAbsent != null) {
            return putIfAbsent.getKvState(kvStateID, bArr);
        }
        this.bootstrap.connect(kvStateServerAddress.getHost(), kvStateServerAddress.getPort()).addListener2((GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Void>>) pendingConnection2);
        return pendingConnection2.getKvState(kvStateID, bArr);
    }

    public void shutDown() {
        EventLoopGroup group;
        if (this.shutDown.compareAndSet(false, true)) {
            for (Map.Entry<KvStateServerAddress, EstablishedConnection> entry : this.establishedConnections.entrySet()) {
                if (this.establishedConnections.remove(entry.getKey(), entry.getValue())) {
                    entry.getValue().close();
                }
            }
            for (Map.Entry<KvStateServerAddress, PendingConnection> entry2 : this.pendingConnections.entrySet()) {
                if (this.pendingConnections.remove(entry2.getKey()) != null) {
                    entry2.getValue().close();
                }
            }
            if (this.bootstrap == null || (group = this.bootstrap.group()) == null) {
                return;
            }
            group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
        }
    }

    public void closeConnection(KvStateServerAddress kvStateServerAddress) {
        PendingConnection pendingConnection = this.pendingConnections.get(kvStateServerAddress);
        if (pendingConnection != null) {
            pendingConnection.close();
        }
        EstablishedConnection remove = this.establishedConnections.remove(kvStateServerAddress);
        if (remove != null) {
            remove.close();
        }
    }
}
