package com.oceanbase.clogproxy.client.connection;

import com.google.protobuf.InvalidProtocolBufferException;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.connection.StreamContext;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.common.packet.CompressType;
import com.oceanbase.clogproxy.common.packet.HeaderType;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto;
import com.oceanbase.clogproxy.common.util.NetworkUtil;
import com.oceanbase.oms.logmessage.LogMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.concurrent.BlockingQueue;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
import org.apache.commons.lang3.Conversion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/oceanbase/clogproxy/client/connection/ClientHandler.class */
public class ClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
    private static final byte[] MAGIC_STRING = {120, 105, 53, 51, 103, 93, 113};
    private static final String CLIENT_IP = NetworkUtil.getLocalIp();
    private static final int HEAD_LENGTH = 7;
    private ClientStream stream;
    private ClientConf config;
    private ConnectionParams params;
    private BlockingQueue<StreamContext.TransferPacket> recordQueue;
    ByteBuf buffer;
    private boolean first;
    private HandshakeStateV1 state = HandshakeStateV1.PB_HEAD;
    private final ByteToMessageDecoder.Cumulator cumulator = ByteToMessageDecoder.MERGE_CUMULATOR;
    private boolean poolFlag = true;
    private int numReads = 0;
    private boolean dataNotEnough = false;
    private int dataLength = 0;
    LZ4Factory factory = LZ4Factory.fastestInstance();
    LZ4FastDecompressor fastDecompressor = this.factory.fastDecompressor();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/oceanbase/clogproxy/client/connection/ClientHandler$HandshakeStateV1.class */
    public enum HandshakeStateV1 {
        PB_HEAD,
        CLIENT_HANDSHAKE_RESPONSE,
        RECORD,
        ERROR_RESPONSE,
        STATUS
    }

    protected void resetState() {
        this.state = HandshakeStateV1.PB_HEAD;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof ByteBuf)) {
            if (!(obj instanceof IdleStateEvent) || this.stream == null) {
                return;
            }
            this.stream.triggerReconnect();
            return;
        }
        this.dataNotEnough = false;
        ByteBuf byteBuf = (ByteBuf) obj;
        this.first = this.buffer == null;
        if (this.first) {
            this.buffer = byteBuf;
        } else {
            this.buffer = this.cumulator.cumulate(channelHandlerContext.alloc(), this.buffer, byteBuf);
        }
        while (this.poolFlag && this.buffer.isReadable() && !this.dataNotEnough) {
            switch (this.state) {
                case PB_HEAD:
                    handleHeader();
                    break;
                case CLIENT_HANDSHAKE_RESPONSE:
                    handleHandshakeResponse();
                    break;
                case ERROR_RESPONSE:
                    handleErrorResponse();
                    break;
                case STATUS:
                    handleServerStatus();
                    break;
                case RECORD:
                    handleRecord();
                    break;
            }
        }
        if (this.buffer != null && !this.buffer.isReadable()) {
            this.numReads = 0;
            this.buffer.release();
            this.buffer = null;
        } else {
            int i = this.numReads + 1;
            this.numReads = i;
            if (i >= this.config.getNettyDiscardAfterReads()) {
                this.numReads = 0;
                discardSomeReadBytes();
            }
        }
    }

    private void handleHeader() {
        if (this.buffer.readableBytes() < HEAD_LENGTH) {
            this.dataNotEnough = true;
            return;
        }
        short readShort = this.buffer.readShort();
        byte readByte = this.buffer.readByte();
        this.dataLength = this.buffer.readInt();
        checkHeader(readShort, readByte, this.dataLength);
        HeaderType codeOf = HeaderType.codeOf(readByte);
        if (codeOf == HeaderType.HANDSHAKE_RESPONSE_CLIENT) {
            this.state = HandshakeStateV1.CLIENT_HANDSHAKE_RESPONSE;
            return;
        }
        if (codeOf == HeaderType.ERROR_RESPONSE) {
            this.state = HandshakeStateV1.ERROR_RESPONSE;
        } else if (codeOf == HeaderType.DATA_CLIENT) {
            this.state = HandshakeStateV1.RECORD;
        } else if (codeOf == HeaderType.STATUS) {
            this.state = HandshakeStateV1.STATUS;
        }
    }

    private void handleHandshakeResponse() throws InvalidProtocolBufferException {
        if (this.buffer.readableBytes() < this.dataLength) {
            this.dataNotEnough = true;
            return;
        }
        byte[] bArr = new byte[this.dataLength];
        this.buffer.readBytes(bArr);
        LogProxyProto.ClientHandshakeResponse parseFrom = LogProxyProto.ClientHandshakeResponse.parseFrom(bArr);
        logger.info("Connected to LogProxyServer, ip:{}, version:{}", parseFrom.getIp(), parseFrom.getVersion());
        this.state = HandshakeStateV1.PB_HEAD;
    }

    private void handleErrorResponse() throws InvalidProtocolBufferException {
        if (this.buffer.readableBytes() < this.dataLength) {
            this.dataNotEnough = true;
            return;
        }
        byte[] bArr = new byte[this.dataLength];
        this.buffer.readBytes(bArr);
        LogProxyProto.ErrorResponse parseFrom = LogProxyProto.ErrorResponse.parseFrom(bArr);
        logger.error("LogProxy refused handshake request: {}", parseFrom.toString());
        throw new LogProxyClientException(ErrorCode.NO_AUTH, "LogProxy refused handshake request: " + parseFrom.toString());
    }

    private void handleServerStatus() throws InvalidProtocolBufferException {
        if (this.buffer.readableBytes() < this.dataLength) {
            this.dataNotEnough = true;
            return;
        }
        byte[] bArr = new byte[this.dataLength];
        this.buffer.readBytes(bArr);
        logger.debug("Server status: {}", LogProxyProto.RuntimeStatus.parseFrom(bArr).toString());
        this.state = HandshakeStateV1.PB_HEAD;
    }

    private void handleRecord() {
        if (this.buffer.readableBytes() < this.dataLength) {
            this.dataNotEnough = true;
        } else {
            parseDataNew();
            this.state = HandshakeStateV1.PB_HEAD;
        }
    }

    private void checkHeader(int i, int i2, int i3) {
        if (ProtocolVersion.codeOf(i) == null) {
            logger.error("Unsupported protocol version: {}", Integer.valueOf(i));
            throw new LogProxyClientException(ErrorCode.E_PROTOCOL, "Unsupported protocol version: " + i);
        }
        if (HeaderType.codeOf(i2) == null) {
            logger.error("Unsupported header type: {}", Integer.valueOf(i2));
            throw new LogProxyClientException(ErrorCode.E_HEADER_TYPE, "Unsupported header type: " + i2);
        }
        if (i3 <= 0) {
            logger.error("Data length equals 0");
            throw new LogProxyClientException(ErrorCode.E_LEN, "Data length equals 0");
        }
    }

    private void parseDataNew() {
        try {
            byte[] bArr = new byte[this.dataLength];
            this.buffer.readBytes(bArr, 0, this.dataLength);
            LogProxyProto.RecordData parseFrom = LogProxyProto.RecordData.parseFrom(bArr);
            int compressType = parseFrom.getCompressType();
            int compressedLen = parseFrom.getCompressedLen();
            int rawLen = parseFrom.getRawLen();
            byte[] byteArray = parseFrom.getRecords().toByteArray();
            if (compressType == CompressType.LZ4.code()) {
                byte[] bArr2 = new byte[rawLen];
                int decompress = this.fastDecompressor.decompress(byteArray, 0, bArr2, 0, rawLen);
                if (decompress != compressedLen) {
                    throw new LogProxyClientException(ErrorCode.E_LEN, "decompressed length [" + decompress + "] is not expected [" + rawLen + "]");
                }
                parseRecord(bArr2);
            } else {
                parseRecord(byteArray);
            }
        } catch (InvalidProtocolBufferException e) {
            throw new LogProxyClientException(ErrorCode.E_PARSE, "Failed to read PB packet", e);
        }
    }

    private void parseRecord(byte[] bArr) throws LogProxyClientException {
        int i = 0;
        while (i < bArr.length) {
            int byteArrayToInt = Conversion.byteArrayToInt(bArr, i + 4, 0, 0, 4);
            try {
                LogMessage logMessage = new LogMessage(false);
                byte[] bArr2 = new byte[byteArrayToInt + 8];
                System.arraycopy(bArr, i, bArr2, 0, bArr2.length);
                logMessage.parse(bArr2);
                if (this.config.isIgnoreUnknownRecordType()) {
                    logger.debug("Unsupported record type: {}", logMessage);
                    i += 8 + byteArrayToInt;
                } else {
                    while (true) {
                        try {
                            this.recordQueue.put(new StreamContext.TransferPacket(logMessage));
                            this.stream.setCheckpointString(logMessage.getTimestamp());
                            break;
                        } catch (InterruptedException e) {
                        }
                    }
                    i += 8 + byteArrayToInt;
                }
            } catch (Exception e2) {
                throw new LogProxyClientException(ErrorCode.E_PARSE, e2);
            }
        }
    }

    protected final void discardSomeReadBytes() {
        if (this.buffer == null || this.first || this.buffer.refCnt() != 1) {
            return;
        }
        this.buffer.discardSomeReadBytes();
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) {
        this.poolFlag = true;
        StreamContext streamContext = (StreamContext) channelHandlerContext.channel().attr(ConnectionFactory.CONTEXT_KEY).get();
        this.stream = streamContext.stream();
        this.config = streamContext.config();
        this.params = streamContext.params();
        this.recordQueue = streamContext.recordQueue();
        logger.info("ClientId: {} connecting LogProxy: {}", this.params.info(), NetworkUtil.parseRemoteAddress(channelHandlerContext.channel()));
        channelHandlerContext.channel().writeAndFlush(generateConnectRequest(this.params.getProtocolVersion()));
    }

    public ByteBuf generateConnectRequestV2() {
        byte[] byteArray = LogProxyProto.ClientHandshakeRequest.newBuilder().setLogType(this.params.getLogType().code()).setIp(CLIENT_IP).setId(this.params.getClientId()).setVersion(ClientConf.VERSION).setEnableMonitor(this.params.isEnableMonitor()).setConfiguration(this.params.getConfigurationString()).build().toByteArray();
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(MAGIC_STRING.length + 2 + 1 + 4 + byteArray.length);
        buffer.writeBytes(MAGIC_STRING);
        buffer.writeShort(ProtocolVersion.V2.code());
        buffer.writeByte(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
        buffer.writeInt(byteArray.length);
        buffer.writeBytes(byteArray);
        return buffer;
    }

    public ByteBuf generateConnectRequest(ProtocolVersion protocolVersion) {
        if (protocolVersion == ProtocolVersion.V2) {
            return generateConnectRequestV2();
        }
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(MAGIC_STRING.length);
        buffer.writeBytes(MAGIC_STRING);
        buffer.capacity(buffer.capacity() + 2 + 4 + 1);
        buffer.writeShort(ProtocolVersion.V0.code());
        buffer.writeInt(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
        buffer.writeByte(this.params.getLogType().code());
        int length = CLIENT_IP.length();
        buffer.capacity(buffer.capacity() + length + 4);
        buffer.writeInt(length);
        buffer.writeBytes(CLIENT_IP.getBytes());
        int length2 = this.params.getClientId().length();
        buffer.capacity(buffer.capacity() + length2 + 4);
        buffer.writeInt(length2);
        buffer.writeBytes(this.params.getClientId().getBytes());
        int length3 = ClientConf.VERSION.length();
        buffer.capacity(buffer.capacity() + length3 + 4);
        buffer.writeInt(length3);
        buffer.writeBytes(ClientConf.VERSION.getBytes());
        int length4 = this.params.getConfigurationString().length();
        buffer.capacity(buffer.capacity() + length4 + 4);
        buffer.writeInt(length4);
        buffer.writeBytes(this.params.getConfigurationString().getBytes());
        return buffer;
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.poolFlag = false;
        logger.info("Channel closed with ClientId: {}, LogProxy: {}", this.params.getClientId(), NetworkUtil.parseRemoteAddress(channelHandlerContext.channel()));
        channelHandlerContext.channel().disconnect();
        channelHandlerContext.close();
        if (this.stream != null) {
            this.stream.triggerReconnect();
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        this.poolFlag = false;
        resetState();
        logger.error("Exception occurred ClientId: {}, with LogProxy: {}", new Object[]{this.params.info(), NetworkUtil.parseRemoteAddress(channelHandlerContext.channel()), th});
        channelHandlerContext.channel().disconnect();
        channelHandlerContext.close();
        if (this.stream != null) {
            if (!(th instanceof LogProxyClientException)) {
                this.stream.triggerReconnect();
            } else if (((LogProxyClientException) th).needStop()) {
                this.stream.stop();
                this.stream.triggerException((LogProxyClientException) th);
            }
        }
    }
}
