package org.apache.kudu.client;

import com.stumbleupon.async.Deferred;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
import org.apache.kudu.WireProtocol;
import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.client.Negotiator;
import org.apache.kudu.client.RpcTraceFrame;
import org.apache.kudu.client.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.kudu.client.shaded.com.google.common.base.Preconditions;
import org.apache.kudu.client.shaded.com.google.common.collect.Lists;
import org.apache.kudu.client.shaded.com.google.protobuf.Message;
import org.apache.kudu.client.shaded.org.jboss.netty.buffer.ChannelBuffers;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.Channel;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.ChannelEvent;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.ChannelFuture;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.ChannelFutureListener;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.ChannelHandlerContext;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.ChannelStateEvent;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.ExceptionEvent;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.MessageEvent;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.apache.kudu.client.shaded.org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.apache.kudu.master.Master;
import org.apache.kudu.rpc.RpcHeader;
import org.apache.kudu.tserver.Tserver;
import org.apache.kudu.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/kudu/client/TabletClient.class */
public class TabletClient extends SimpleChannelUpstreamHandler {
    public static final Logger LOG;
    public static final byte RPC_CURRENT_VERSION = 9;
    private static final byte[] CONNECTION_HEADER;
    private Channel chan;
    private final ReentrantLock lock = new ReentrantLock();

    @GuardedBy("lock")
    ArrayList<KuduRpc<?>> pendingRpcs = Lists.newArrayList();

    @GuardedBy("lock")
    private int nextCallId = 0;

    @GuardedBy("lock")
    private State state = State.NEGOTIATING;

    @GuardedBy("lock")
    private HashMap<Integer, KuduRpc<?>> rpcsInflight = new HashMap<>();

    @GuardedBy("lock")
    private Negotiator.Result negotiationResult;
    private final AsyncKuduClient kuduClient;
    private final long socketReadTimeoutMs;
    private final RequestTracker requestTracker;
    private final ServerInfo serverInfo;
    private volatile boolean closedByClient;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kudu/client/TabletClient$State.class */
    public enum State {
        NEGOTIATING,
        ALIVE,
        DISCONNECTED
    }

    public TabletClient(AsyncKuduClient asyncKuduClient, ServerInfo serverInfo) {
        this.kuduClient = asyncKuduClient;
        this.socketReadTimeoutMs = asyncKuduClient.getDefaultSocketReadTimeoutMs();
        this.requestTracker = asyncKuduClient.getRequestTracker();
        this.serverInfo = serverInfo;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <R> void sendRpc(KuduRpc<R> kuduRpc) {
        Preconditions.checkArgument(kuduRpc.hasDeferred());
        kuduRpc.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(kuduRpc.method(), RpcTraceFrame.Action.SEND_TO_SERVER).serverInfo(this.serverInfo).build());
        if (!kuduRpc.deadlineTracker.hasDeadline()) {
            LOG.warn(getPeerUuidLoggingString() + " sending an rpc without a timeout " + kuduRpc);
        }
        try {
            Message createRequestPB = kuduRpc.createRequestPB();
            this.lock.lock();
            try {
                if (this.state == State.DISCONNECTED) {
                    this.lock.unlock();
                    failOrRetryRpc(kuduRpc, new RecoverableException(Status.NetworkError(getPeerUuidLoggingString() + "Connection reset")));
                    if (0 != 0) {
                        this.lock.unlock();
                        return;
                    }
                    return;
                }
                if (this.state == State.NEGOTIATING) {
                    this.pendingRpcs.add(kuduRpc);
                    if (1 != 0) {
                        this.lock.unlock();
                        return;
                    }
                    return;
                }
                if (!$assertionsDisabled && this.state != State.ALIVE) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && this.chan == null) {
                    throw new AssertionError();
                }
                if (kuduRpc.getRequiredFeatures().isEmpty() || this.negotiationResult.serverFeatures.contains(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS)) {
                    sendCallToWire(kuduRpc, createRequestPB);
                    if (1 != 0) {
                        this.lock.unlock();
                        return;
                    }
                    return;
                }
                this.lock.unlock();
                kuduRpc.errback(new NonRecoverableException(Status.NotSupported("the server does not support theAPPLICATION_FEATURE_FLAGS RPC feature")));
                if (0 != 0) {
                    this.lock.unlock();
                }
            } catch (Throwable th) {
                if (1 != 0) {
                    this.lock.unlock();
                }
                throw th;
            }
        } catch (Exception e) {
            LOG.error("Uncaught exception while constructing RPC request: " + kuduRpc, e);
            kuduRpc.errback(e);
        }
    }

    @GuardedBy("lock")
    private <R> void sendCallToWire(KuduRpc<R> kuduRpc, Message message) {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.state != State.ALIVE) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.chan == null) {
            throw new AssertionError();
        }
        int i = this.nextCallId;
        this.nextCallId = i + 1;
        RpcHeader.RequestHeader.Builder remoteMethod = RpcHeader.RequestHeader.newBuilder().addAllRequiredFeatureFlags(kuduRpc.getRequiredFeatures()).setCallId(i).setRemoteMethod(RpcHeader.RemoteMethodPB.newBuilder().setServiceName(kuduRpc.serviceName()).setMethodName(kuduRpc.method()));
        if (kuduRpc.deadlineTracker.hasDeadline() || this.socketReadTimeoutMs > 0) {
            long j = Long.MAX_VALUE;
            if (kuduRpc.deadlineTracker.hasDeadline()) {
                j = kuduRpc.deadlineTracker.getMillisBeforeDeadline();
            }
            long j2 = Long.MAX_VALUE;
            if (this.socketReadTimeoutMs > 0) {
                j2 = this.socketReadTimeoutMs;
            }
            remoteMethod.setTimeoutMillis((int) Math.min(j, j2));
        }
        if (kuduRpc.isRequestTracked()) {
            RpcHeader.RequestIdPB.Builder newBuilder = RpcHeader.RequestIdPB.newBuilder();
            if (kuduRpc.getSequenceId() == -1) {
                kuduRpc.setSequenceId(this.requestTracker.newSeqNo());
            }
            newBuilder.setClientId(this.requestTracker.getClientId());
            newBuilder.setSeqNo(kuduRpc.getSequenceId());
            newBuilder.setAttemptNo(kuduRpc.attempt);
            newBuilder.setFirstIncompleteSeqNo(this.requestTracker.firstIncomplete());
            remoteMethod.setRequestId(newBuilder);
        }
        KuduRpc<?> put = this.rpcsInflight.put(Integer.valueOf(i), kuduRpc);
        if (put == null) {
            Channels.write(this.chan, new RpcOutboundMessage(remoteMethod, message));
        } else {
            String str = getPeerUuidLoggingString() + "Unexpected state: there was already an RPC in flight with callId=" + i + ": " + put + ".  This happened when sending out: " + kuduRpc;
            LOG.error(str);
            put.errback(new NonRecoverableException(Status.IllegalState(str)));
        }
    }

    @VisibleForTesting
    ChannelFuture disconnect() {
        Preconditions.checkNotNull(this.chan);
        this.closedByClient = true;
        return Channels.disconnect(this.chan);
    }

    public Deferred<Void> shutdown() {
        ChannelFuture disconnect = disconnect();
        final Deferred<Void> deferred = new Deferred<>();
        disconnect.addListener(new ChannelFutureListener() { // from class: org.apache.kudu.client.TabletClient.1
            @Override // org.apache.kudu.client.shaded.org.jboss.netty.channel.ChannelFutureListener
            public void operationComplete(ChannelFuture channelFuture) {
                if (channelFuture.isSuccess()) {
                    deferred.callback((Object) null);
                    return;
                }
                Throwable cause = channelFuture.getCause();
                if (cause instanceof Exception) {
                    deferred.callback(cause);
                } else {
                    deferred.callback(new NonRecoverableException(Status.IllegalState("Failed to shutdown: " + TabletClient.this), cause));
                }
            }
        });
        return deferred;
    }

    @Override // org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        Object message = messageEvent.getMessage();
        if (message instanceof Negotiator.Result) {
            this.lock.lock();
            try {
                if (!$assertionsDisabled && this.chan == null) {
                    throw new AssertionError();
                }
                this.negotiationResult = (Negotiator.Result) message;
                this.state = State.ALIVE;
                ArrayList<KuduRpc<?>> arrayList = this.pendingRpcs;
                this.pendingRpcs = null;
                sendQueuedRpcs(arrayList);
                return;
            } finally {
                this.lock.unlock();
            }
        }
        if (!(message instanceof CallResponse)) {
            channelHandlerContext.sendUpstream(messageEvent);
            return;
        }
        CallResponse callResponse = (CallResponse) message;
        long nanoTime = System.nanoTime();
        RpcHeader.ResponseHeader header = callResponse.getHeader();
        if (!header.hasCallId()) {
            String str = getPeerUuidLoggingString() + "RPC response (size: " + callResponse.getTotalResponseSize() + ") doesn't have a call ID: " + header;
            LOG.error(str);
            throw new NonRecoverableException(Status.Incomplete(str));
        }
        int callId = header.getCallId();
        this.lock.lock();
        try {
            KuduRpc<?> remove = this.rpcsInflight.remove(Integer.valueOf(callId));
            this.lock.unlock();
            if (remove == null) {
                String str2 = getPeerUuidLoggingString() + "Invalid rpcid: " + callId;
                LOG.error(str2);
                throw new NonRecoverableException(Status.IllegalState(str2));
            }
            RpcTraceFrame.RpcTraceFrameBuilder serverInfo = new RpcTraceFrame.RpcTraceFrameBuilder(remove.method(), RpcTraceFrame.Action.RECEIVE_FROM_SERVER).serverInfo(this.serverInfo);
            Pair<?, Object> pair = null;
            KuduException kuduException = null;
            Status OK = Status.OK();
            if (header.hasIsError() && header.getIsError()) {
                RpcHeader.ErrorStatusPB.Builder newBuilder = RpcHeader.ErrorStatusPB.newBuilder();
                KuduRpc.readProtobuf(callResponse.getPBMessage(), newBuilder);
                RpcHeader.ErrorStatusPB build = newBuilder.build();
                if (build.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY)) {
                    OK = Status.ServiceUnavailable(build.getMessage());
                } else {
                    String str3 = getPeerUuidLoggingString() + "Tablet server sent error " + build.getMessage();
                    kuduException = new RpcRemoteException(Status.RemoteError(str3), build);
                    LOG.error(str3);
                }
            } else {
                try {
                    pair = remove.deserialize(callResponse, this.serverInfo.getUuid());
                } catch (KuduException e) {
                    kuduException = e;
                }
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace(getPeerUuidLoggingString() + " received RPC response: rpcId=" + callId + ", response size=" + callResponse.getTotalResponseSize() + ", rpc=" + remove);
            }
            if (!OK.ok()) {
                remove.addTrace(serverInfo.callStatus(OK).build());
                this.kuduClient.handleRetryableError(remove, new RecoverableException(OK));
                return;
            }
            if (pair != null) {
                if (pair.getSecond() instanceof Tserver.TabletServerErrorPB) {
                    kuduException = dispatchTSErrorOrReturnException(remove, (Tserver.TabletServerErrorPB) pair.getSecond(), serverInfo);
                    if (kuduException == null) {
                        return;
                    } else {
                        pair = null;
                    }
                } else if (pair.getSecond() instanceof Master.MasterErrorPB) {
                    kuduException = dispatchMasterErrorOrReturnException(remove, (Master.MasterErrorPB) pair.getSecond(), serverInfo);
                    if (kuduException == null) {
                        return;
                    } else {
                        pair = null;
                    }
                }
            }
            try {
                if (pair == null) {
                    if (this.kuduClient.isStatisticsEnabled()) {
                        remove.updateStatistics(this.kuduClient.getStatistics(), null);
                    }
                    remove.addTrace(serverInfo.callStatus(kuduException.getStatus()).build());
                    remove.errback(kuduException);
                } else {
                    if (!$assertionsDisabled && (pair.getFirst() instanceof Exception)) {
                        throw new AssertionError();
                    }
                    if (this.kuduClient.isStatisticsEnabled()) {
                        remove.updateStatistics(this.kuduClient.getStatistics(), pair.getFirst());
                    }
                    remove.addTrace(serverInfo.callStatus(Status.OK()).build());
                    remove.callback(pair.getFirst());
                }
            } catch (Exception e2) {
                LOG.debug(getPeerUuidLoggingString() + "Unexpected exception while handling RPC #" + callId + ", rpc=" + remove, e2);
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("------------------<< LEAVING  DECODE <<------------------ time elapsed: " + ((System.nanoTime() - nanoTime) / 1000) + "us");
            }
        } finally {
        }
    }

    private KuduException dispatchTSErrorOrReturnException(KuduRpc<?> kuduRpc, Tserver.TabletServerErrorPB tabletServerErrorPB, RpcTraceFrame.RpcTraceFrameBuilder rpcTraceFrameBuilder) {
        WireProtocol.AppStatusPB.ErrorCode code = tabletServerErrorPB.getStatus().getCode();
        Status fromTabletServerErrorPB = Status.fromTabletServerErrorPB(tabletServerErrorPB);
        if (tabletServerErrorPB.getCode() == Tserver.TabletServerErrorPB.Code.TABLET_NOT_FOUND) {
            this.kuduClient.handleTabletNotFound(kuduRpc, new RecoverableException(fromTabletServerErrorPB), this);
        } else if (code == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
            this.kuduClient.handleRetryableError(kuduRpc, new RecoverableException(fromTabletServerErrorPB));
        } else {
            if (code != WireProtocol.AppStatusPB.ErrorCode.ILLEGAL_STATE && code != WireProtocol.AppStatusPB.ErrorCode.ABORTED) {
                return new NonRecoverableException(fromTabletServerErrorPB);
            }
            this.kuduClient.handleNotLeader(kuduRpc, new RecoverableException(fromTabletServerErrorPB), this);
        }
        kuduRpc.addTrace(rpcTraceFrameBuilder.callStatus(fromTabletServerErrorPB).build());
        return null;
    }

    private KuduException dispatchMasterErrorOrReturnException(KuduRpc<?> kuduRpc, Master.MasterErrorPB masterErrorPB, RpcTraceFrame.RpcTraceFrameBuilder rpcTraceFrameBuilder) {
        WireProtocol.AppStatusPB.ErrorCode code = masterErrorPB.getStatus().getCode();
        Status fromMasterErrorPB = Status.fromMasterErrorPB(masterErrorPB);
        if (masterErrorPB.getCode() == Master.MasterErrorPB.Code.NOT_THE_LEADER) {
            this.kuduClient.handleNotLeader(kuduRpc, new RecoverableException(fromMasterErrorPB), this);
        } else {
            if (code != WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
                return new NonRecoverableException(fromMasterErrorPB);
            }
            if (kuduRpc instanceof ConnectToMasterRequest) {
                return new RecoverableException(fromMasterErrorPB);
            }
            this.kuduClient.handleRetryableError(kuduRpc, new RecoverableException(fromMasterErrorPB));
        }
        kuduRpc.addTrace(rpcTraceFrameBuilder.callStatus(fromMasterErrorPB).build());
        return null;
    }

    public boolean isAlive() {
        boolean z;
        this.lock.lock();
        try {
            if (this.state != State.ALIVE) {
                if (this.state != State.NEGOTIATING) {
                    z = false;
                    return z;
                }
            }
            z = true;
            return z;
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelOpen(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        this.chan = channelStateEvent.getChannel();
        super.channelOpen(channelHandlerContext, channelStateEvent);
    }

    @Override // org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) {
        if (!$assertionsDisabled && this.chan == null) {
            throw new AssertionError();
        }
        Channels.write(this.chan, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
        Negotiator negotiator = new Negotiator(this.serverInfo.getHostname(), this.kuduClient.getSecurityContext());
        channelHandlerContext.getPipeline().addBefore(channelHandlerContext.getName(), "negotiation", negotiator);
        negotiator.sendHello(this.chan);
    }

    @Override // org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler, org.apache.kudu.client.shaded.org.jboss.netty.channel.ChannelUpstreamHandler
    public void handleUpstream(ChannelHandlerContext channelHandlerContext, ChannelEvent channelEvent) throws Exception {
        if (LOG.isTraceEnabled()) {
            LOG.trace(channelEvent.toString());
        }
        super.handleUpstream(channelHandlerContext, channelEvent);
    }

    @Override // org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelDisconnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        super.channelDisconnected(channelHandlerContext, channelStateEvent);
        cleanup("Connection disconnected");
    }

    @Override // org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        super.channelClosed(channelHandlerContext, channelStateEvent);
        cleanup("Connection closed");
    }

    private void cleanup(String str) {
        ArrayList newArrayList = Lists.newArrayList();
        this.lock.lock();
        try {
            if (this.state == State.DISCONNECTED) {
                if (!$assertionsDisabled && this.pendingRpcs != null) {
                    throw new AssertionError();
                }
                return;
            }
            this.state = State.DISCONNECTED;
            if (this.pendingRpcs != null) {
                newArrayList.addAll(this.pendingRpcs);
            }
            this.pendingRpcs = null;
            newArrayList.addAll(this.rpcsInflight.values());
            this.rpcsInflight = null;
            this.lock.unlock();
            failOrRetryRpcs(newArrayList, new RecoverableException(Status.NetworkError(getPeerUuidLoggingString() + (str == null ? "Connection reset" : str))));
        } finally {
            this.lock.unlock();
        }
    }

    private void failOrRetryRpcs(Collection<KuduRpc<?>> collection, RecoverableException recoverableException) {
        Iterator<KuduRpc<?>> it = collection.iterator();
        while (it.hasNext()) {
            failOrRetryRpc(it.next(), recoverableException);
        }
    }

    private void failOrRetryRpc(KuduRpc<?> kuduRpc, RecoverableException recoverableException) {
        kuduRpc.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(kuduRpc.method(), RpcTraceFrame.Action.RECEIVE_FROM_SERVER).serverInfo(this.serverInfo).callStatus(recoverableException.getStatus()).build());
        if (kuduRpc.getTablet() == null) {
            kuduRpc.errback(recoverableException);
        } else {
            this.kuduClient.handleTabletNotFound(kuduRpc, recoverableException, this);
        }
    }

    @Override // org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
        Throwable cause = exceptionEvent.getCause();
        Channel channel = exceptionEvent.getChannel();
        if (cause instanceof RejectedExecutionException) {
            LOG.warn(getPeerUuidLoggingString() + "RPC rejected by the executor, ignore this if we're shutting down", cause);
        } else if (cause instanceof ReadTimeoutException) {
            LOG.debug(getPeerUuidLoggingString() + "Encountered a read timeout, will close the channel");
        } else if (!(cause instanceof ClosedChannelException)) {
            LOG.error(getPeerUuidLoggingString() + "Unexpected exception from downstream on " + channel, cause);
        } else if (!this.closedByClient) {
            LOG.info(getPeerUuidLoggingString() + "Lost connection to peer");
        }
        if (channel.isOpen()) {
            Channels.close(channel);
        } else {
            cleanup(cause.getMessage());
        }
    }

    private void sendQueuedRpcs(List<KuduRpc<?>> list) {
        if (!$assertionsDisabled && this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        for (KuduRpc<?> kuduRpc : list) {
            LOG.debug(getPeerUuidLoggingString() + "Executing RPC queued: " + kuduRpc);
            sendRpc(kuduRpc);
        }
    }

    private String getPeerUuidLoggingString() {
        return "[Peer " + this.serverInfo.getUuid() + "] ";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServerInfo getServerInfo() {
        return this.serverInfo;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("TabletClient@").append(hashCode()).append("(chan=").append(this.chan).append(", uuid=").append(this.serverInfo.getUuid()).append(", #pending_rpcs=");
        this.lock.lock();
        try {
            int size = this.pendingRpcs == null ? 0 : this.pendingRpcs.size();
            int size2 = this.rpcsInflight == null ? 0 : this.rpcsInflight.size();
            sb.append(size);
            sb.append(", #rpcs_inflight=").append(size2).append(')');
            return sb.toString();
        } finally {
            this.lock.unlock();
        }
    }

    static {
        $assertionsDisabled = !TabletClient.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(TabletClient.class);
        CONNECTION_HEADER = new byte[]{104, 114, 112, 99, 9, 0, 0};
    }
}
