package org.apache.rocketmq.store.ha.autoswitch;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.EpochEntry;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.ha.FlowMonitor;
import org.apache.rocketmq.store.ha.HAClient;
import org.apache.rocketmq.store.ha.HAConnectionState;
import org.apache.rocketmq.store.ha.io.AbstractHAReader;
import org.apache.rocketmq.store.ha.io.HAWriter;

/* loaded from: input_file:org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.class */
public class AutoSwitchHAClient extends ServiceThread implements HAClient {
    public static final int HANDSHAKE_HEADER_SIZE = 12;
    public static final int HANDSHAKE_SIZE = 62;
    public static final int TRANSFER_HEADER_SIZE = 12;
    public static final int MIN_HEADER_SIZE = Math.min(12, 12);
    private static final Logger LOGGER = LoggerFactory.getLogger("RocketmqStore");
    private static final int READ_MAX_BUFFER_SIZE = 4194304;
    private final AutoSwitchHAService haService;
    private final DefaultMessageStore messageStore;
    private final EpochFileCache epochCache;
    private String localAddress;
    private SocketChannel socketChannel;
    private Selector selector;
    private AbstractHAReader haReader;
    private HAWriter haWriter;
    private FlowMonitor flowMonitor;
    private long lastReadTimestamp;
    private long lastWriteTimestamp;
    private long currentReportedOffset;
    private int processPosition;
    private volatile HAConnectionState currentState;
    private volatile int currentReceivedEpoch;
    private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
    private final AtomicReference<String> masterAddress = new AtomicReference<>();
    private final AtomicReference<Long> slaveId = new AtomicReference<>();
    private final ByteBuffer handshakeHeaderBuffer = ByteBuffer.allocate(62);
    private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(12);
    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient$HAClientReader.class */
    public class HAClientReader extends AbstractHAReader {
        HAClientReader() {
        }

        /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
        /* JADX WARN: Failed to find 'out' block for switch in B:13:0x012a. Please report as an issue. */
        /* JADX WARN: Removed duplicated region for block: B:56:0x0325 A[Catch: Exception -> 0x0345, TryCatch #0 {Exception -> 0x0345, blocks: (B:3:0x0005, B:5:0x0018, B:9:0x006f, B:10:0x008b, B:60:0x009a, B:63:0x00ac, B:12:0x0119, B:13:0x012a, B:14:0x0144, B:17:0x0153, B:20:0x0187, B:22:0x01c6, B:26:0x01ff, B:29:0x0218, B:32:0x0227, B:37:0x027b, B:39:0x02a3, B:41:0x02af, B:44:0x02d4, B:45:0x02e7, B:48:0x030a, B:54:0x031e, B:56:0x0325), top: B:2:0x0005 }] */
        @Override // org.apache.rocketmq.store.ha.io.AbstractHAReader
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        protected boolean processReadResult(java.nio.ByteBuffer r9) {
            /*
                Method dump skipped, instructions count: 851
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAClient.HAClientReader.processReadResult(java.nio.ByteBuffer):boolean");
        }
    }

    public AutoSwitchHAClient(AutoSwitchHAService autoSwitchHAService, DefaultMessageStore defaultMessageStore, EpochFileCache epochFileCache) throws IOException {
        this.haService = autoSwitchHAService;
        this.messageStore = defaultMessageStore;
        this.epochCache = epochFileCache;
        init();
    }

    public void init() throws IOException {
        this.selector = NetworkUtil.openSelector();
        this.flowMonitor = new FlowMonitor(this.messageStore.getMessageStoreConfig());
        this.haReader = new HAClientReader();
        this.haReader.registerHook(i -> {
            if (i > 0) {
                this.flowMonitor.addByteCountTransferred(i);
                this.lastReadTimestamp = System.currentTimeMillis();
            }
        });
        this.haWriter = new HAWriter();
        this.haWriter.registerHook(i2 -> {
            if (i2 > 0) {
                this.lastWriteTimestamp = System.currentTimeMillis();
            }
        });
        changeCurrentState(HAConnectionState.READY);
        this.currentReceivedEpoch = -1;
        this.currentReportedOffset = 0L;
        this.processPosition = 0;
        this.lastReadTimestamp = System.currentTimeMillis();
        this.lastWriteTimestamp = System.currentTimeMillis();
        this.haService.updateConfirmOffset(-1L);
    }

    public void reOpen() throws IOException {
        shutdown();
        init();
    }

    public String getServiceName() {
        return this.haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer() ? this.haService.getDefaultMessageStore().getBrokerIdentity().getIdentifier() + AutoSwitchHAClient.class.getSimpleName() : AutoSwitchHAClient.class.getSimpleName();
    }

    public void setLocalAddress(String str) {
        this.localAddress = str;
    }

    public void updateSlaveId(Long l) {
        Long l2 = this.slaveId.get();
        if (this.slaveId.compareAndSet(l2, l)) {
            LOGGER.info("Update slave Id, OLD: {}, New: {}", l2, l);
        }
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public void updateMasterAddress(String str) {
        String str2 = this.masterAddress.get();
        if (StringUtils.equals(str, str2) || !this.masterAddress.compareAndSet(str2, str)) {
            return;
        }
        LOGGER.info("update master address, OLD: " + str2 + " NEW: " + str);
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public void updateHaMasterAddress(String str) {
        String str2 = this.masterHaAddress.get();
        if (StringUtils.equals(str, str2) || !this.masterHaAddress.compareAndSet(str2, str)) {
            return;
        }
        LOGGER.info("update master ha address, OLD: " + str2 + " NEW: " + str);
        wakeup();
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public String getMasterAddress() {
        return this.masterAddress.get();
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public String getHaMasterAddress() {
        return this.masterHaAddress.get();
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public long getLastReadTimestamp() {
        return this.lastReadTimestamp;
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public long getLastWriteTimestamp() {
        return this.lastWriteTimestamp;
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public HAConnectionState getCurrentState() {
        return this.currentState;
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public void changeCurrentState(HAConnectionState hAConnectionState) {
        LOGGER.info("change state to {}", hAConnectionState);
        this.currentState = hAConnectionState;
    }

    public void closeMasterAndWait() {
        closeMaster();
        waitForRunning(5000L);
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public void closeMaster() {
        if (null != this.socketChannel) {
            try {
                SelectionKey keyFor = this.socketChannel.keyFor(this.selector);
                if (keyFor != null) {
                    keyFor.cancel();
                }
                this.socketChannel.close();
                this.socketChannel = null;
                LOGGER.info("AutoSwitchHAClient close connection with master {}", this.masterHaAddress.get());
                changeCurrentState(HAConnectionState.READY);
            } catch (IOException e) {
                LOGGER.warn("CloseMaster exception. ", e);
            }
            this.lastReadTimestamp = 0L;
            this.processPosition = 0;
            this.byteBufferRead.position(0);
            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
        }
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public long getTransferredByteInSecond() {
        return this.flowMonitor.getTransferredByteInSecond();
    }

    @Override // org.apache.rocketmq.store.ha.HAClient
    public void shutdown() {
        changeCurrentState(HAConnectionState.SHUTDOWN);
        this.flowMonitor.shutdown();
        super.shutdown();
        closeMaster();
        try {
            this.selector.close();
        } catch (IOException e) {
            LOGGER.warn("Close the selector of AutoSwitchHAClient error, ", e);
        }
    }

    private boolean isTimeToReportOffset() {
        return this.messageStore.now() - this.lastWriteTimestamp > ((long) this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval());
    }

    private boolean sendHandshakeHeader() throws IOException {
        this.handshakeHeaderBuffer.position(0);
        this.handshakeHeaderBuffer.limit(62);
        this.handshakeHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
        this.handshakeHeaderBuffer.putShort(this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile() ? (short) 1 : (short) 0);
        this.handshakeHeaderBuffer.putShort(this.haService.getDefaultMessageStore().getMessageStoreConfig().isAsyncLearner() ? (short) 1 : (short) 0);
        this.handshakeHeaderBuffer.putInt(this.localAddress == null ? 0 : this.localAddress.length());
        this.handshakeHeaderBuffer.put(this.localAddress == null ? new byte[0] : this.localAddress.getBytes(StandardCharsets.UTF_8));
        this.handshakeHeaderBuffer.flip();
        return this.haWriter.write(this.socketChannel, this.handshakeHeaderBuffer);
    }

    private void handshakeWithMaster() throws IOException {
        if (!sendHandshakeHeader()) {
            closeMasterAndWait();
        }
        this.selector.select(5000L);
        if (this.haReader.read(this.socketChannel, this.byteBufferRead)) {
            return;
        }
        closeMasterAndWait();
    }

    private boolean reportSlaveOffset(long j) throws IOException {
        this.transferHeaderBuffer.position(0);
        this.transferHeaderBuffer.limit(12);
        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
        this.transferHeaderBuffer.putLong(j);
        this.transferHeaderBuffer.flip();
        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean reportSlaveMaxOffset() throws IOException {
        boolean z = true;
        long maxPhyOffset = this.messageStore.getMaxPhyOffset();
        if (maxPhyOffset > this.currentReportedOffset) {
            this.currentReportedOffset = maxPhyOffset;
            z = reportSlaveOffset(this.currentReportedOffset);
        }
        return z;
    }

    public boolean connectMaster() throws IOException {
        if (null == this.socketChannel) {
            String str = this.masterHaAddress.get();
            if (StringUtils.isNotEmpty(str)) {
                this.socketChannel = RemotingHelper.connect(NetworkUtil.string2SocketAddress(str));
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, 1);
                    LOGGER.info("AutoSwitchHAClient connect to master {}", str);
                    changeCurrentState(HAConnectionState.HANDSHAKE);
                }
            }
            this.currentReportedOffset = this.messageStore.getMaxPhyOffset();
            this.lastReadTimestamp = System.currentTimeMillis();
        }
        return this.socketChannel != null;
    }

    private boolean transferFromMaster() throws IOException {
        if (isTimeToReportOffset()) {
            LOGGER.info("Slave report current offset {}", Long.valueOf(this.currentReportedOffset));
            if (!reportSlaveOffset(this.currentReportedOffset)) {
                return false;
            }
        }
        this.selector.select(1000L);
        if (this.haReader.read(this.socketChannel, this.byteBufferRead)) {
            return reportSlaveMaxOffset();
        }
        return false;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:6:0x0037. Please report as an issue. */
    public void run() {
        LOGGER.info(getServiceName() + " service started");
        this.flowMonitor.start();
        while (!isStopped()) {
            try {
            } catch (Exception e) {
                LOGGER.warn(getServiceName() + " service has exception. ", e);
                closeMasterAndWait();
            }
            switch (this.currentState) {
                case SHUTDOWN:
                    return;
                case READY:
                    long truncateInvalidMsg = this.haService.truncateInvalidMsg();
                    if (truncateInvalidMsg >= 0) {
                        this.epochCache.truncateSuffixByOffset(truncateInvalidMsg);
                    }
                    if (!connectMaster()) {
                        LOGGER.warn("AutoSwitchHAClient connect to master {} failed", this.masterHaAddress.get());
                        waitForRunning(5000L);
                    }
                case HANDSHAKE:
                    handshakeWithMaster();
                case TRANSFER:
                    if (transferFromMaster()) {
                        long now = this.messageStore.now() - this.lastReadTimestamp;
                        if (now > this.messageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
                            LOGGER.warn("AutoSwitchHAClient, housekeeping, found this connection[" + this.masterHaAddress + "] expired, " + now);
                            closeMaster();
                            LOGGER.warn("AutoSwitchHAClient, master not response some time, so close connection");
                        }
                    } else {
                        closeMasterAndWait();
                    }
                case SUSPEND:
                default:
                    waitForRunning(5000L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean doTruncate(List<EpochEntry> list, long j) throws IOException {
        if (this.epochCache.getEntrySize() == 0) {
            LOGGER.info("Slave local epochCache is empty, skip truncate log");
            changeCurrentState(HAConnectionState.TRANSFER);
            this.currentReportedOffset = 0L;
        } else {
            EpochFileCache epochFileCache = new EpochFileCache();
            epochFileCache.initCacheFromEntries(list);
            epochFileCache.setLastEpochEntryEndOffset(j);
            List<EpochEntry> allEntries = this.epochCache.getAllEntries();
            EpochFileCache epochFileCache2 = new EpochFileCache();
            epochFileCache2.initCacheFromEntries(allEntries);
            epochFileCache2.setLastEpochEntryEndOffset(this.messageStore.getMaxPhyOffset());
            long findConsistentPoint = epochFileCache2.findConsistentPoint(epochFileCache);
            if (findConsistentPoint < 0) {
                LOGGER.error("Failed to find a consistent point between masterEpoch:{} and slaveEpoch:{}", list, allEntries);
                return false;
            }
            if (!this.messageStore.truncateFiles(findConsistentPoint)) {
                LOGGER.error("Failed to truncate slave log to {}", Long.valueOf(findConsistentPoint));
                return false;
            }
            this.epochCache.truncateSuffixByOffset(findConsistentPoint);
            LOGGER.info("Truncate slave log to {} success, change to transfer state", Long.valueOf(findConsistentPoint));
            changeCurrentState(HAConnectionState.TRANSFER);
            this.currentReportedOffset = findConsistentPoint;
        }
        if (reportSlaveMaxOffset()) {
            return true;
        }
        LOGGER.error("AutoSwitchHAClient report max offset to master failed");
        return false;
    }
}
