package org.apache.rocketmq.store.ha.autoswitch;

import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.EpochEntry;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.DefaultHAService;
import org.apache.rocketmq.store.ha.GroupTransferService;
import org.apache.rocketmq.store.ha.HAClient;
import org.apache.rocketmq.store.ha.HAConnection;
import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;

/* loaded from: input_file:org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.class */
public class AutoSwitchHAService extends DefaultHAService {
    private static final Logger LOGGER = LoggerFactory.getLogger("RocketmqStore");
    private String localAddress;
    private EpochFileCache epochCache;
    private AutoSwitchHAClient haClient;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
    private final List<Consumer<Set<String>>> syncStateSetChangedListeners = new ArrayList();
    private final CopyOnWriteArraySet<String> syncStateSet = new CopyOnWriteArraySet<>();
    private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable = new ConcurrentHashMap<>();
    private volatile long confirmOffset = -1;
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    /* loaded from: input_file:org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService$AutoSwitchAcceptSocketService.class */
    class AutoSwitchAcceptSocketService extends DefaultHAService.AcceptSocketService {
        public AutoSwitchAcceptSocketService(MessageStoreConfig messageStoreConfig) {
            super(messageStoreConfig);
        }

        public String getServiceName() {
            return AutoSwitchHAService.this.defaultMessageStore.getBrokerConfig().isInBrokerContainer() ? AutoSwitchHAService.this.defaultMessageStore.getBrokerConfig().getIdentifier() + DefaultHAService.AcceptSocketService.class.getSimpleName() : AutoSwitchAcceptSocketService.class.getSimpleName();
        }

        @Override // org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService
        protected HAConnection createConnection(SocketChannel socketChannel) throws IOException {
            return new AutoSwitchHAConnection(AutoSwitchHAService.this, socketChannel, AutoSwitchHAService.this.epochCache);
        }
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public void init(DefaultMessageStore defaultMessageStore) throws IOException {
        this.epochCache = new EpochFileCache(defaultMessageStore.getMessageStoreConfig().getStorePathEpochFile());
        this.epochCache.initCacheFromFile();
        this.defaultMessageStore = defaultMessageStore;
        this.acceptSocketService = new AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig());
        this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
        this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public void shutdown() {
        super.shutdown();
        if (this.haClient != null) {
            this.haClient.shutdown();
        }
        this.executorService.shutdown();
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService
    public void removeConnection(HAConnection hAConnection) {
        if (!this.defaultMessageStore.isShutdown()) {
            Set<String> syncStateSet = getSyncStateSet();
            String slaveAddress = ((AutoSwitchHAConnection) hAConnection).getSlaveAddress();
            if (syncStateSet.contains(slaveAddress)) {
                syncStateSet.remove(slaveAddress);
                notifySyncStateSetChanged(syncStateSet);
            }
        }
        super.removeConnection(hAConnection);
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public boolean changeToMaster(int i) {
        int lastEpoch = this.epochCache.lastEpoch();
        if (i < lastEpoch) {
            LOGGER.warn("newMasterEpoch {} < lastEpoch {}, fail to change to master", Integer.valueOf(i), Integer.valueOf(lastEpoch));
            return false;
        }
        destroyConnections();
        if (this.haClient != null) {
            this.haClient.shutdown();
        }
        long truncateInvalidMsg = truncateInvalidMsg();
        updateConfirmOffset(computeConfirmOffset());
        if (truncateInvalidMsg >= 0) {
            this.epochCache.truncateSuffixByOffset(truncateInvalidMsg);
        }
        EpochEntry epochEntry = new EpochEntry(i, this.defaultMessageStore.getMaxPhyOffset());
        if (this.epochCache.lastEpoch() >= i) {
            this.epochCache.truncateSuffixByEpoch(i);
        }
        this.epochCache.appendEntry(epochEntry);
        while (this.defaultMessageStore.dispatchBehindBytes() > 0) {
            try {
                Thread.sleep(100L);
            } catch (Exception e) {
            }
        }
        if (this.defaultMessageStore.isTransientStorePoolEnable()) {
            waitingForAllCommit();
            this.defaultMessageStore.getTransientStorePool().setRealCommit(true);
        }
        LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", new Object[]{Long.valueOf(truncateInvalidMsg), Long.valueOf(getConfirmOffset()), Long.valueOf(this.defaultMessageStore.getMaxPhyOffset())});
        this.defaultMessageStore.recoverTopicQueueTable();
        LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", Integer.valueOf(i), Long.valueOf(epochEntry.getStartOffset()));
        return true;
    }

    @Override // org.apache.rocketmq.store.ha.HAService
    public boolean changeToSlave(String str, int i, Long l) {
        int lastEpoch = this.epochCache.lastEpoch();
        if (i < lastEpoch) {
            LOGGER.warn("newMasterEpoch {} < lastEpoch {}, fail to change to slave", Integer.valueOf(i), Integer.valueOf(lastEpoch));
            return false;
        }
        try {
            destroyConnections();
            if (this.haClient == null) {
                this.haClient = new AutoSwitchHAClient(this, this.defaultMessageStore, this.epochCache);
            } else {
                this.haClient.reOpen();
            }
            this.haClient.setLocalAddress(this.localAddress);
            this.haClient.updateSlaveId(l);
            this.haClient.updateMasterAddress(str);
            this.haClient.updateHaMasterAddress(null);
            this.haClient.start();
            if (this.defaultMessageStore.isTransientStorePoolEnable()) {
                waitingForAllCommit();
                this.defaultMessageStore.getTransientStorePool().setRealCommit(false);
            }
            LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", str, Integer.valueOf(i));
            return true;
        } catch (Exception e) {
            LOGGER.error("Error happen when change ha to slave", e);
            return false;
        }
    }

    public void waitingForAllCommit() {
        while (getDefaultMessageStore().remainHowManyDataToCommit() > 0) {
            getDefaultMessageStore().getCommitLog().getFlushManager().wakeUpCommit();
            try {
                Thread.sleep(100L);
            } catch (Exception e) {
            }
        }
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public HAClient getHAClient() {
        return this.haClient;
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public void updateHaMasterAddress(String str) {
        if (this.haClient != null) {
            this.haClient.updateHaMasterAddress(str);
        }
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public void updateMasterAddress(String str) {
    }

    public void registerSyncStateSetChangedListener(Consumer<Set<String>> consumer) {
        this.syncStateSetChangedListeners.add(consumer);
    }

    public void notifySyncStateSetChanged(Set<String> set) {
        this.executorService.submit(() -> {
            Iterator<Consumer<Set<String>>> it = this.syncStateSetChangedListeners.iterator();
            while (it.hasNext()) {
                it.next().accept(set);
            }
        });
    }

    public Set<String> maybeShrinkInSyncStateSet() {
        Set<String> syncStateSet = getSyncStateSet();
        long haMaxTimeSlaveNotCatchup = this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
        Iterator<Map.Entry<String, Long>> it = this.connectionCaughtUpTimeTable.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            if (syncStateSet.contains(key)) {
                if (System.currentTimeMillis() - this.connectionCaughtUpTimeTable.get(key).longValue() > haMaxTimeSlaveNotCatchup) {
                    syncStateSet.remove(key);
                }
            }
        }
        return syncStateSet;
    }

    public void maybeExpandInSyncStateSet(String str, long j) {
        Set<String> syncStateSet = getSyncStateSet();
        if (!syncStateSet.contains(str) && j >= getConfirmOffset() && j >= this.epochCache.lastEntry().getStartOffset()) {
            syncStateSet.add(str);
            notifySyncStateSetChanged(syncStateSet);
        }
    }

    public void updateConnectionLastCaughtUpTime(String str, long j) {
        this.connectionCaughtUpTimeTable.put(str, Long.valueOf(Math.max(((Long) ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, str, str2 -> {
            return 0L;
        })).longValue(), j)));
    }

    public long getConfirmOffset() {
        if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
            if (this.syncStateSet.size() == 1) {
                return this.defaultMessageStore.getMaxPhyOffset();
            }
            if (this.confirmOffset <= 0) {
                this.confirmOffset = computeConfirmOffset();
            }
        }
        return this.confirmOffset;
    }

    public void updateConfirmOffsetWhenSlaveAck(String str) {
        if (this.syncStateSet.contains(str)) {
            this.confirmOffset = computeConfirmOffset();
        }
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public int inSyncReplicasNums(long j) {
        Lock readLock = this.readWriteLock.readLock();
        try {
            readLock.lock();
            int size = this.syncStateSet.size();
            readLock.unlock();
            return size;
        } catch (Throwable th) {
            readLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.ha.DefaultHAService, org.apache.rocketmq.store.ha.HAService
    public HARuntimeInfo getRuntimeInfo(long j) {
        HARuntimeInfo hARuntimeInfo = new HARuntimeInfo();
        if (BrokerRole.SLAVE.equals(getDefaultMessageStore().getMessageStoreConfig().getBrokerRole())) {
            hARuntimeInfo.setMaster(false);
            hARuntimeInfo.getHaClientRuntimeInfo().setMasterAddr(this.haClient.getHaMasterAddress());
            hARuntimeInfo.getHaClientRuntimeInfo().setMaxOffset(getDefaultMessageStore().getMaxPhyOffset());
            hARuntimeInfo.getHaClientRuntimeInfo().setLastReadTimestamp(this.haClient.getLastReadTimestamp());
            hARuntimeInfo.getHaClientRuntimeInfo().setLastWriteTimestamp(this.haClient.getLastWriteTimestamp());
            hARuntimeInfo.getHaClientRuntimeInfo().setTransferredByteInSecond(this.haClient.getTransferredByteInSecond());
            hARuntimeInfo.getHaClientRuntimeInfo().setMasterFlushOffset(this.defaultMessageStore.getMasterFlushedOffset());
        } else {
            hARuntimeInfo.setMaster(true);
            hARuntimeInfo.setMasterCommitLogMaxOffset(j);
            for (HAConnection hAConnection : this.connectionList) {
                HARuntimeInfo.HAConnectionRuntimeInfo hAConnectionRuntimeInfo = new HARuntimeInfo.HAConnectionRuntimeInfo();
                long slaveAckOffset = hAConnection.getSlaveAckOffset();
                hAConnectionRuntimeInfo.setSlaveAckOffset(slaveAckOffset);
                hAConnectionRuntimeInfo.setDiff(j - slaveAckOffset);
                hAConnectionRuntimeInfo.setAddr(hAConnection.getClientAddress().substring(1));
                hAConnectionRuntimeInfo.setTransferredByteInSecond(hAConnection.getTransferredByteInSecond());
                hAConnectionRuntimeInfo.setTransferFromWhere(hAConnection.getTransferFromWhere());
                hAConnectionRuntimeInfo.setInSync(this.syncStateSet.contains(((AutoSwitchHAConnection) hAConnection).getSlaveAddress()));
                hARuntimeInfo.getHaConnectionInfo().add(hAConnectionRuntimeInfo);
            }
            hARuntimeInfo.setInSyncSlaveNums(this.syncStateSet.size() - 1);
        }
        return hARuntimeInfo;
    }

    public void updateConfirmOffset(long j) {
        this.confirmOffset = j;
    }

    private long computeConfirmOffset() {
        Set<String> syncStateSet = getSyncStateSet();
        long maxPhyOffset = this.defaultMessageStore.getMaxPhyOffset();
        for (HAConnection hAConnection : this.connectionList) {
            if (syncStateSet.contains(((AutoSwitchHAConnection) hAConnection).getSlaveAddress())) {
                maxPhyOffset = Math.min(maxPhyOffset, hAConnection.getSlaveAckOffset());
            }
        }
        return maxPhyOffset;
    }

    public void setSyncStateSet(Set<String> set) {
        Lock writeLock = this.readWriteLock.writeLock();
        try {
            writeLock.lock();
            this.syncStateSet.clear();
            this.syncStateSet.addAll(set);
            this.confirmOffset = computeConfirmOffset();
        } finally {
            writeLock.unlock();
        }
    }

    public Set<String> getSyncStateSet() {
        Lock readLock = this.readWriteLock.readLock();
        try {
            readLock.lock();
            HashSet hashSet = new HashSet(this.syncStateSet.size());
            hashSet.addAll(this.syncStateSet);
            readLock.unlock();
            return hashSet;
        } catch (Throwable th) {
            readLock.unlock();
            throw th;
        }
    }

    public void truncateEpochFilePrefix(long j) {
        this.epochCache.truncatePrefixByOffset(j);
    }

    public void truncateEpochFileSuffix(long j) {
        this.epochCache.truncateSuffixByOffset(j);
    }

    public void setLocalAddress(String str) {
        this.localAddress = str;
    }

    public long truncateInvalidMsg() {
        long dispatchBehindBytes = this.defaultMessageStore.dispatchBehindBytes();
        if (dispatchBehindBytes <= 0) {
            LOGGER.info("Dispatch complete, skip truncate");
            return -1L;
        }
        boolean z = true;
        long maxPhyOffset = this.defaultMessageStore.getMaxPhyOffset() - dispatchBehindBytes;
        do {
            SelectMappedBufferResult data = this.defaultMessageStore.getCommitLog().getData(maxPhyOffset);
            if (data == null) {
                break;
            }
            try {
                maxPhyOffset = data.getStartOffset();
                int i = 0;
                while (true) {
                    if (i >= data.getSize()) {
                        break;
                    }
                    DispatchRequest checkMessageAndReturnSize = this.defaultMessageStore.getCommitLog().checkMessageAndReturnSize(data.getByteBuffer(), false, false);
                    if (!checkMessageAndReturnSize.isSuccess()) {
                        z = false;
                        break;
                    }
                    int msgSize = checkMessageAndReturnSize.getMsgSize();
                    if (msgSize <= 0) {
                        maxPhyOffset = this.defaultMessageStore.getCommitLog().rollNextFile(maxPhyOffset);
                        break;
                    }
                    maxPhyOffset += msgSize;
                    i += msgSize;
                }
                if (maxPhyOffset >= this.defaultMessageStore.getMaxPhyOffset()) {
                    break;
                }
            } finally {
                data.release();
            }
        } while (z);
        LOGGER.info("Truncate commitLog to {}", Long.valueOf(maxPhyOffset));
        this.defaultMessageStore.truncateDirtyFiles(maxPhyOffset);
        return maxPhyOffset;
    }

    public int getLastEpoch() {
        return this.epochCache.lastEpoch();
    }

    public List<EpochEntry> getEpochEntries() {
        return this.epochCache.getAllEntries();
    }
}
