package org.apache.kudu.client;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.net.ssl.SSLException;
import org.apache.kudu.client.CallResponse;
import org.apache.kudu.client.Negotiator;
import org.apache.kudu.client.RpcOutboundMessage;
import org.apache.kudu.rpc.RpcHeader;
import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.kudu.shaded.com.google.common.base.Preconditions;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.apache.kudu.shaded.org.jboss.netty.buffer.ChannelBuffers;
import org.apache.kudu.shaded.org.jboss.netty.channel.Channel;
import org.apache.kudu.shaded.org.jboss.netty.channel.ChannelEvent;
import org.apache.kudu.shaded.org.jboss.netty.channel.ChannelFuture;
import org.apache.kudu.shaded.org.jboss.netty.channel.ChannelFutureListener;
import org.apache.kudu.shaded.org.jboss.netty.channel.ChannelHandlerContext;
import org.apache.kudu.shaded.org.jboss.netty.channel.ChannelPipeline;
import org.apache.kudu.shaded.org.jboss.netty.channel.ChannelStateEvent;
import org.apache.kudu.shaded.org.jboss.netty.channel.Channels;
import org.apache.kudu.shaded.org.jboss.netty.channel.DefaultChannelPipeline;
import org.apache.kudu.shaded.org.jboss.netty.channel.ExceptionEvent;
import org.apache.kudu.shaded.org.jboss.netty.channel.MessageEvent;
import org.apache.kudu.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.apache.kudu.shaded.org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.apache.kudu.shaded.org.jboss.netty.channel.socket.SocketChannel;
import org.apache.kudu.shaded.org.jboss.netty.channel.socket.SocketChannelConfig;
import org.apache.kudu.shaded.org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.apache.kudu.shaded.org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.apache.kudu.shaded.org.jboss.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.kudu.shaded.org.jboss.netty.util.HashedWheelTimer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/kudu/client/Connection.class */
public class Connection extends SimpleChannelUpstreamHandler {
    private final ServerInfo serverInfo;
    private final SecurityContext securityContext;
    private final long socketReadTimeoutMs;
    private final HashedWheelTimer timer;
    private final CredentialsPolicy credentialsPolicy;
    private final SocketChannel channel;
    private static final Logger LOG;
    private static final byte RPC_CURRENT_VERSION = 9;
    private static final byte[] CONNECTION_HEADER;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile boolean explicitlyDisconnected = false;
    private final ReentrantLock lock = new ReentrantLock();

    @GuardedBy("lock")
    private HashMap<Integer, Callback<Void, CallResponseInfo>> inflightMessages = new HashMap<>();

    @GuardedBy("lock")
    private ArrayList<QueuedMessage> queuedMessages = Lists.newArrayList();

    @GuardedBy("lock")
    private Negotiator.Success negotiationResult = null;

    @GuardedBy("lock")
    private Negotiator.Failure negotiationFailure = null;

    @GuardedBy("lock")
    private int nextCallId = 0;

    @GuardedBy("lock")
    private State state = State.NEW;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kudu/client/Connection$CallResponseInfo.class */
    public static final class CallResponseInfo {
        public final CallResponse response;
        public final KuduException exception;

        CallResponseInfo(CallResponse callResponse, KuduException kuduException) {
            this.response = callResponse;
            this.exception = kuduException;
        }
    }

    /* loaded from: input_file:org/apache/kudu/client/Connection$ConnectionPipeline.class */
    private final class ConnectionPipeline extends DefaultChannelPipeline {
        private ConnectionPipeline() {
        }

        void init() {
            super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(268435456, 0, 4, 0, 4));
            super.addLast("decode-inbound", new CallResponse.Decoder());
            super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
            if (Connection.this.socketReadTimeoutMs > 0) {
                super.addLast("timeout-handler", new ReadTimeoutHandler(Connection.this.timer, Connection.this.socketReadTimeoutMs, TimeUnit.MILLISECONDS));
            }
            super.addLast("kudu-handler", Connection.this);
        }
    }

    /* loaded from: input_file:org/apache/kudu/client/Connection$CredentialsPolicy.class */
    public enum CredentialsPolicy {
        ANY_CREDENTIALS,
        PRIMARY_CREDENTIALS
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kudu/client/Connection$QueuedMessage.class */
    public static final class QueuedMessage {
        private final RpcOutboundMessage message;
        private final Callback<Void, CallResponseInfo> cb;

        QueuedMessage(RpcOutboundMessage rpcOutboundMessage, Callback<Void, CallResponseInfo> callback) {
            this.message = rpcOutboundMessage;
            this.cb = callback;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kudu/client/Connection$State.class */
    public enum State {
        NEW,
        CONNECTING,
        NEGOTIATING,
        NEGOTIATION_FAILED,
        READY,
        TERMINATED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Connection(ServerInfo serverInfo, SecurityContext securityContext, long j, HashedWheelTimer hashedWheelTimer, ClientSocketChannelFactory clientSocketChannelFactory, CredentialsPolicy credentialsPolicy) {
        this.serverInfo = serverInfo;
        this.securityContext = securityContext;
        this.socketReadTimeoutMs = j;
        this.timer = hashedWheelTimer;
        this.credentialsPolicy = credentialsPolicy;
        ConnectionPipeline connectionPipeline = new ConnectionPipeline();
        connectionPipeline.init();
        this.channel = clientSocketChannelFactory.newChannel((ChannelPipeline) connectionPipeline);
        SocketChannelConfig config = this.channel.getConfig();
        config.setConnectTimeoutMillis(60000);
        config.setTcpNoDelay(true);
        config.setKeepAlive(true);
    }

    @Override // org.apache.kudu.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) {
        this.lock.lock();
        try {
            if (this.state == State.TERMINATED) {
                return;
            }
            Preconditions.checkState(this.state == State.CONNECTING);
            this.state = State.NEGOTIATING;
            this.lock.unlock();
            Channels.write(this.channel, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
            Negotiator negotiator = new Negotiator(this.serverInfo.getAndCanonicalizeHostname(), this.securityContext, this.credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS);
            channelHandlerContext.getPipeline().addBefore(channelHandlerContext.getName(), "negotiation", negotiator);
            negotiator.sendHello(this.channel);
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.kudu.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler, org.apache.kudu.shaded.org.jboss.netty.channel.ChannelUpstreamHandler
    public void handleUpstream(ChannelHandlerContext channelHandlerContext, ChannelEvent channelEvent) throws Exception {
        if (LOG.isTraceEnabled()) {
            LOG.trace("{} upstream event {}", getLogPrefix(), channelEvent);
        }
        super.handleUpstream(channelHandlerContext, channelEvent);
    }

    @Override // org.apache.kudu.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelDisconnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) {
        cleanup(new RecoverableException(Status.NetworkError("connection disconnected")));
    }

    @Override // org.apache.kudu.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) {
        cleanup(new RecoverableException(Status.NetworkError("connection closed")));
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.kudu.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        Object message = messageEvent.getMessage();
        if (message instanceof Negotiator.Success) {
            this.lock.lock();
            try {
                this.negotiationResult = (Negotiator.Success) message;
                Preconditions.checkState(this.state == State.TERMINATED || this.inflightMessages.isEmpty());
                while (this.state != State.TERMINATED && !this.queuedMessages.isEmpty()) {
                    ArrayList<QueuedMessage> arrayList = this.queuedMessages;
                    for (QueuedMessage queuedMessage : arrayList) {
                        Preconditions.checkState(this.inflightMessages.put(Integer.valueOf(queuedMessage.message.getHeaderBuilder().getCallId()), queuedMessage.cb) == null);
                    }
                    this.queuedMessages = Lists.newArrayList();
                    this.lock.unlock();
                    try {
                        Iterator<QueuedMessage> it = arrayList.iterator();
                        while (it.hasNext()) {
                            sendCallToWire(it.next().message);
                        }
                        this.lock.lock();
                    } catch (Throwable th) {
                        this.lock.lock();
                        throw th;
                    }
                }
                if (this.state == State.TERMINATED) {
                    return;
                }
                Preconditions.checkState(this.state == State.NEGOTIATING);
                this.queuedMessages = null;
                this.state = State.READY;
                this.lock.unlock();
                return;
            } finally {
                this.lock.unlock();
            }
        }
        if (message instanceof Negotiator.Failure) {
            this.lock.lock();
            try {
                if (this.state == State.TERMINATED) {
                    this.lock.unlock();
                    return;
                }
                Preconditions.checkState(this.state == State.NEGOTIATING);
                Preconditions.checkState(this.inflightMessages.isEmpty());
                this.state = State.NEGOTIATION_FAILED;
                this.negotiationFailure = (Negotiator.Failure) message;
                this.lock.unlock();
                Channels.close(messageEvent.getChannel());
                return;
            } finally {
                this.lock.unlock();
            }
        }
        if (!(message instanceof CallResponse)) {
            channelHandlerContext.sendUpstream(messageEvent);
            return;
        }
        CallResponse callResponse = (CallResponse) message;
        RpcHeader.ResponseHeader header = callResponse.getHeader();
        if (!header.hasCallId()) {
            String str = getLogPrefix() + " RPC response (size: " + callResponse.getTotalResponseSize() + ") doesn't have callID: " + header;
            LOG.error(str);
            throw new NonRecoverableException(Status.Incomplete(str));
        }
        int callId = header.getCallId();
        this.lock.lock();
        try {
            if (this.state == State.TERMINATED) {
                this.lock.unlock();
                return;
            }
            Preconditions.checkState(this.state == State.READY);
            Callback<Void, CallResponseInfo> remove = this.inflightMessages.remove(Integer.valueOf(callId));
            this.lock.unlock();
            if (remove == null) {
                String str2 = getLogPrefix() + " invalid callID: " + callId;
                LOG.error(str2);
                throw new NonRecoverableException(Status.IllegalState(str2));
            }
            if (!header.hasIsError() || !header.getIsError()) {
                remove.call(new CallResponseInfo(callResponse, null));
                return;
            }
            RpcHeader.ErrorStatusPB.Builder newBuilder = RpcHeader.ErrorStatusPB.newBuilder();
            KuduRpc.readProtobuf(callResponse.getPBMessage(), newBuilder);
            RpcHeader.ErrorStatusPB build = newBuilder.build();
            if (build.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) || build.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) {
                remove.call(new CallResponseInfo(callResponse, new RecoverableException(Status.ServiceUnavailable(build.getMessage()))));
                return;
            }
            String str3 = getLogPrefix() + " server sent error " + build.getMessage();
            LOG.error(str3);
            remove.call(new CallResponseInfo(callResponse, new RpcRemoteException(Status.RemoteError(str3), build)));
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.kudu.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
        KuduException recoverableException;
        Throwable cause = exceptionEvent.getCause();
        Channel channel = exceptionEvent.getChannel();
        if (cause instanceof KuduException) {
            recoverableException = (KuduException) cause;
        } else if (cause instanceof RejectedExecutionException) {
            String format = String.format("%s RPC rejected by the executor (ignore if shutting down)", getLogPrefix());
            recoverableException = new RecoverableException(Status.NetworkError(format), cause);
            LOG.warn(format, cause);
        } else if (cause instanceof ReadTimeoutException) {
            String format2 = String.format("%s encountered a read timeout; closing the channel", getLogPrefix());
            recoverableException = new RecoverableException(Status.NetworkError(format2), cause);
            LOG.debug(format2);
        } else if (cause instanceof ClosedChannelException) {
            String format3 = String.format(this.explicitlyDisconnected ? "%s disconnected from peer" : "%s lost connection to peer", getLogPrefix());
            recoverableException = new RecoverableException(Status.NetworkError(format3), cause);
            LOG.info(format3, cause);
        } else if ((cause instanceof SSLException) && this.explicitlyDisconnected) {
            recoverableException = new RecoverableException(Status.NetworkError(String.format("%s disconnected from peer", getLogPrefix())));
        } else {
            if (!$assertionsDisabled && this.explicitlyDisconnected) {
                throw new AssertionError();
            }
            String format4 = String.format("%s unexpected exception from downstream on %s", getLogPrefix(), channel);
            recoverableException = new RecoverableException(Status.NetworkError(format4), cause);
            LOG.error(format4, cause);
        }
        cleanup(recoverableException);
        if (channel.isOpen()) {
            Channels.close(channel);
        }
    }

    public ServerInfo getServerInfo() {
        return this.serverInfo;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CredentialsPolicy getCredentialsPolicy() {
        return this.credentialsPolicy;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isTerminated() {
        this.lock.lock();
        try {
            return this.state == State.TERMINATED;
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public Set<RpcHeader.RpcFeatureFlag> getPeerFeatures() {
        Set<RpcHeader.RpcFeatureFlag> set = null;
        this.lock.lock();
        try {
            if (this.negotiationResult != null) {
                set = this.negotiationResult.serverFeatures;
            }
            return set;
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getLogPrefix() {
        return "[peer " + this.serverInfo.getUuid() + "]";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueMessage(RpcOutboundMessage rpcOutboundMessage, Callback<Void, CallResponseInfo> callback) throws RecoverableException {
        int timeoutMillis;
        this.lock.lock();
        try {
            if (this.state == State.TERMINATED) {
                throw new RecoverableException(Status.IllegalState("connection is terminated"));
            }
            if (this.state == State.NEW) {
                connect();
            }
            int i = this.nextCallId;
            this.nextCallId = i + 1;
            RpcHeader.RequestHeader.Builder headerBuilder = rpcOutboundMessage.getHeaderBuilder();
            headerBuilder.setCallId(i);
            if (this.socketReadTimeoutMs > 0 && (timeoutMillis = headerBuilder.getTimeoutMillis()) > 0) {
                headerBuilder.setTimeoutMillis((int) Math.min(timeoutMillis, this.socketReadTimeoutMs));
            }
            if (this.state != State.READY) {
                this.queuedMessages.add(new QueuedMessage(rpcOutboundMessage, callback));
                this.lock.unlock();
            } else {
                if (!$assertionsDisabled && this.state != State.READY) {
                    throw new AssertionError();
                }
                Preconditions.checkState(this.inflightMessages.put(Integer.valueOf(i), callback) == null);
                this.lock.unlock();
                sendCallToWire(rpcOutboundMessage);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    ChannelFuture disconnect() {
        this.explicitlyDisconnected = true;
        return Channels.disconnect(this.channel);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Deferred<Void> shutdown() {
        ChannelFuture disconnect = disconnect();
        final Deferred<Void> deferred = new Deferred<>();
        disconnect.addListener(new ChannelFutureListener() { // from class: org.apache.kudu.client.Connection.1
            @Override // org.apache.kudu.shaded.org.jboss.netty.channel.ChannelFutureListener
            public void operationComplete(ChannelFuture channelFuture) {
                if (channelFuture.isSuccess()) {
                    deferred.callback(null);
                    return;
                }
                Throwable cause = channelFuture.getCause();
                if (cause instanceof Exception) {
                    deferred.callback(cause);
                } else {
                    deferred.callback(new NonRecoverableException(Status.IllegalState("failed to shutdown: " + this), cause));
                }
            }
        });
        return deferred;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("Connection@").append(hashCode()).append("(channel=").append(this.channel).append(", uuid=").append(this.serverInfo.getUuid());
        this.lock.lock();
        try {
            sb.append(", #queued=").append(this.queuedMessages == null ? 0 : this.queuedMessages.size()).append(", #inflight=").append(this.inflightMessages == null ? 0 : this.inflightMessages.size()).append(")");
            return sb.toString();
        } finally {
            this.lock.unlock();
        }
    }

    @VisibleForTesting
    boolean isReady() {
        this.lock.lock();
        try {
            return this.state == State.READY;
        } finally {
            this.lock.unlock();
        }
    }

    private void sendCallToWire(RpcOutboundMessage rpcOutboundMessage) {
        if (!$assertionsDisabled && this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("{} sending {}", getLogPrefix(), rpcOutboundMessage);
        }
        Channels.write(this.channel, rpcOutboundMessage);
    }

    private void cleanup(KuduException kuduException) {
        boolean z = false;
        this.lock.lock();
        try {
            if (this.state == State.TERMINATED) {
                Preconditions.checkState(this.queuedMessages == null);
                Preconditions.checkState(this.inflightMessages == null);
                this.lock.unlock();
                return;
            }
            if (this.state == State.NEGOTIATION_FAILED) {
                Preconditions.checkState(this.negotiationFailure != null);
                Preconditions.checkState(this.inflightMessages.isEmpty());
                z = this.negotiationFailure.status.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.FATAL_INVALID_AUTHENTICATION_TOKEN);
            }
            LOG.debug("{} cleaning up while in state {} due to: {}", new Object[]{getLogPrefix(), this.state, kuduException.getMessage()});
            ArrayList<QueuedMessage> arrayList = this.queuedMessages;
            this.queuedMessages = null;
            HashMap<Integer, Callback<Void, CallResponseInfo>> hashMap = this.inflightMessages;
            this.inflightMessages = null;
            this.state = State.TERMINATED;
            this.lock.unlock();
            if (z) {
                kuduException = new InvalidAuthnTokenException(kuduException.getStatus());
            }
            Iterator<Callback<Void, CallResponseInfo>> it = hashMap.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().call(new CallResponseInfo(null, kuduException));
                } catch (Exception e) {
                    LOG.warn("{} exception while aborting in-flight call: {}", getLogPrefix(), e);
                }
            }
            if (arrayList != null) {
                Iterator<QueuedMessage> it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    try {
                        it2.next().cb.call(new CallResponseInfo(null, kuduException));
                    } catch (Exception e2) {
                        LOG.warn("{} exception while aborting enqueued call: {}", getLogPrefix(), e2);
                    }
                }
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private void connect() {
        Preconditions.checkState(this.lock.isHeldByCurrentThread());
        Preconditions.checkState(this.state == State.NEW);
        this.state = State.CONNECTING;
        this.channel.connect(new InetSocketAddress(this.serverInfo.getResolvedAddress(), this.serverInfo.getPort()));
    }

    static {
        $assertionsDisabled = !Connection.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(Connection.class);
        CONNECTION_HEADER = new byte[]{104, 114, 112, 99, 9, 0, 0};
    }
}
