package org.apache.hadoop.ozone.container.common.transport.server.ratis;

import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.opentracing.Scope;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.class */
public class ContainerStateMachine extends BaseStateMachine {
    static final Logger LOG = LoggerFactory.getLogger(ContainerStateMachine.class);
    private final RaftGroupId gid;
    private final ContainerDispatcher dispatcher;
    private ThreadPoolExecutor chunkExecutor;
    private final XceiverServerRatis ratisServer;
    private ExecutorService[] executors;
    private final int numExecutors;
    private final Cache<Long, ByteString> stateMachineDataCache;
    private final boolean isBlockTokenEnabled;
    private final TokenVerifier tokenVerifier;
    private final CSMMetrics metrics;
    private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
    private final ConcurrentHashMap<Long, CompletableFuture<Message>> writeChunkFutureMap = new ConcurrentHashMap<>();
    private final Map<Long, Long> applyTransactionCompletionMap = new ConcurrentHashMap();
    private final Set<Long> createContainerSet = new ConcurrentSkipListSet();

    /* renamed from: org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$hdds$protocol$datanode$proto$ContainerProtos$Type = new int[ContainerProtos.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$hadoop$hdds$protocol$datanode$proto$ContainerProtos$Type[ContainerProtos.Type.WriteChunk.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    public ContainerStateMachine(RaftGroupId raftGroupId, ContainerDispatcher containerDispatcher, ThreadPoolExecutor threadPoolExecutor, XceiverServerRatis xceiverServerRatis, List<ExecutorService> list, long j, boolean z, TokenVerifier tokenVerifier) {
        this.gid = raftGroupId;
        this.dispatcher = containerDispatcher;
        this.chunkExecutor = threadPoolExecutor;
        this.ratisServer = xceiverServerRatis;
        this.metrics = CSMMetrics.create(raftGroupId);
        this.numExecutors = list.size();
        this.executors = (ExecutorService[]) list.toArray(new ExecutorService[this.numExecutors]);
        this.stateMachineDataCache = CacheBuilder.newBuilder().expireAfterAccess(j, TimeUnit.MILLISECONDS).maximumSize(threadPoolExecutor.getCorePoolSize()).build();
        this.isBlockTokenEnabled = z;
        this.tokenVerifier = tokenVerifier;
    }

    public StateMachineStorage getStateMachineStorage() {
        return this.storage;
    }

    public CSMMetrics getMetrics() {
        return this.metrics;
    }

    public void initialize(RaftServer raftServer, RaftGroupId raftGroupId, RaftStorage raftStorage) throws IOException {
        super.initialize(raftServer, raftGroupId, raftStorage);
        this.storage.init(raftStorage);
        loadSnapshot(this.storage.getLatestSnapshot());
    }

    private long loadSnapshot(SingleFileSnapshotInfo singleFileSnapshotInfo) throws IOException {
        if (singleFileSnapshotInfo == null) {
            TermIndex newTermIndex = TermIndex.newTermIndex(0L, -1L);
            LOG.info("The snapshot info is null.Setting the last applied index to:" + newTermIndex);
            setLastAppliedTermIndex(newTermIndex);
            return -1L;
        }
        File file = singleFileSnapshotInfo.getFile().getPath().toFile();
        TermIndex termIndexFromSnapshotFile = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(file);
        LOG.info("Setting the last applied index to " + termIndexFromSnapshotFile);
        setLastAppliedTermIndex(termIndexFromSnapshotFile);
        FileInputStream fileInputStream = new FileInputStream(file);
        Throwable th = null;
        try {
            try {
                this.createContainerSet.addAll(ContainerProtos.ContainerIdSetProto.parseFrom(IOUtils.toByteArray(fileInputStream)).getContainerIdList());
                this.dispatcher.buildMissingContainerSet(this.createContainerSet);
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                return termIndexFromSnapshotFile.getIndex();
            } finally {
            }
        } catch (Throwable th3) {
            if (fileInputStream != null) {
                if (th != null) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th3;
        }
    }

    public void persistContainerSet(OutputStream outputStream) throws IOException {
        ContainerProtos.ContainerIdSetProto.Builder newBuilder = ContainerProtos.ContainerIdSetProto.newBuilder();
        newBuilder.addAllContainerId(this.createContainerSet);
        IOUtils.write(newBuilder.build().toByteArray(), outputStream);
    }

    public long takeSnapshot() throws IOException {
        TermIndex lastAppliedTermIndex = getLastAppliedTermIndex();
        LOG.info("Taking snapshot at termIndex:" + lastAppliedTermIndex);
        if (lastAppliedTermIndex == null || lastAppliedTermIndex.getIndex() == -1) {
            return -1L;
        }
        File snapshotFile = this.storage.getSnapshotFile(lastAppliedTermIndex.getTerm(), lastAppliedTermIndex.getIndex());
        LOG.info("Taking a snapshot to file {}", snapshotFile);
        try {
            FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile);
            Throwable th = null;
            try {
                try {
                    persistContainerSet(fileOutputStream);
                    if (fileOutputStream != null) {
                        if (0 != 0) {
                            try {
                                fileOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileOutputStream.close();
                        }
                    }
                    return lastAppliedTermIndex.getIndex();
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (IOException e) {
            LOG.warn("Failed to write snapshot file \"" + snapshotFile + "\", last applied index=" + lastAppliedTermIndex);
            throw e;
        }
    }

    public TransactionContext startTransaction(RaftClientRequest raftClientRequest) throws IOException {
        ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto = getContainerCommandRequestProto(raftClientRequest.getMessage().getContent());
        Preconditions.checkArgument(raftClientRequest.getRaftGroupId().equals(this.gid));
        Scope importAndCreateScope = TracingUtil.importAndCreateScope(containerCommandRequestProto.getCmdType().name(), containerCommandRequestProto.getTraceID());
        Throwable th = null;
        try {
            try {
                this.dispatcher.validateContainerCommand(containerCommandRequestProto);
                if (containerCommandRequestProto.getCmdType() != ContainerProtos.Type.WriteChunk) {
                    TransactionContext build = TransactionContext.newBuilder().setClientRequest(raftClientRequest).setStateMachine(this).setServerRole(RaftProtos.RaftPeerRole.LEADER).setLogData(raftClientRequest.getMessage().getContent()).build();
                    if (importAndCreateScope != null) {
                        if (0 != 0) {
                            try {
                                importAndCreateScope.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            importAndCreateScope.close();
                        }
                    }
                    return build;
                }
                ContainerProtos.WriteChunkRequestProto writeChunk = containerCommandRequestProto.getWriteChunk();
                TransactionContext build2 = TransactionContext.newBuilder().setClientRequest(raftClientRequest).setStateMachine(this).setServerRole(RaftProtos.RaftPeerRole.LEADER).setStateMachineData(writeChunk.getData()).setLogData(ContainerProtos.ContainerCommandRequestProto.newBuilder(containerCommandRequestProto).setWriteChunk(ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(writeChunk.getBlockID()).setChunkData(writeChunk.getChunkData()).build()).build().toByteString()).build();
                if (importAndCreateScope != null) {
                    if (0 != 0) {
                        try {
                            importAndCreateScope.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        importAndCreateScope.close();
                    }
                }
                return build2;
            } catch (IOException e) {
                TransactionContext build3 = TransactionContext.newBuilder().setClientRequest(raftClientRequest).setStateMachine(this).setServerRole(RaftProtos.RaftPeerRole.LEADER).build();
                build3.setException(e);
                if (importAndCreateScope != null) {
                    if (0 != 0) {
                        try {
                            importAndCreateScope.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        importAndCreateScope.close();
                    }
                }
                return build3;
            }
        } catch (Throwable th5) {
            if (importAndCreateScope != null) {
                if (0 != 0) {
                    try {
                        importAndCreateScope.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    importAndCreateScope.close();
                }
            }
            throw th5;
        }
    }

    private ByteString getStateMachineData(RaftProtos.StateMachineLogEntryProto stateMachineLogEntryProto) {
        return stateMachineLogEntryProto.getStateMachineEntry().getStateMachineData();
    }

    private ContainerProtos.ContainerCommandRequestProto getContainerCommandRequestProto(ByteString byteString) throws InvalidProtocolBufferException {
        return ContainerProtos.ContainerCommandRequestProto.newBuilder(ContainerProtos.ContainerCommandRequestProto.parseFrom(byteString)).setPipelineID(this.gid.getUuid().toString()).build();
    }

    private ContainerProtos.ContainerCommandResponseProto dispatchCommand(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto, DispatcherContext dispatcherContext) {
        LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}", new Object[]{containerCommandRequestProto.getCmdType(), Long.valueOf(containerCommandRequestProto.getContainerID()), containerCommandRequestProto.getPipelineID(), containerCommandRequestProto.getTraceID()});
        if (this.isBlockTokenEnabled) {
            try {
                this.tokenVerifier.verify(UserGroupInformation.getCurrentUser().getShortUserName(), containerCommandRequestProto.getEncodedToken());
            } catch (IOException e) {
                return ContainerUtils.logAndReturnError(LOG, new StorageContainerException("Block token verification failed. " + e.getMessage(), e, ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED), containerCommandRequestProto);
            }
        }
        ContainerProtos.ContainerCommandResponseProto dispatch = this.dispatcher.dispatch(containerCommandRequestProto, dispatcherContext);
        LOG.trace("response {}", dispatch);
        return dispatch;
    }

    private Message runCommand(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto, DispatcherContext dispatcherContext) {
        ContainerProtos.ContainerCommandResponseProto dispatchCommand = dispatchCommand(containerCommandRequestProto, dispatcherContext);
        dispatchCommand.getClass();
        return dispatchCommand::toByteString;
    }

    private ExecutorService getCommandExecutor(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto) {
        return this.executors[(int) (containerCommandRequestProto.getContainerID() % this.numExecutors)];
    }

    private CompletableFuture<Message> handleWriteChunk(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto, long j, long j2) {
        ContainerProtos.WriteChunkRequestProto writeChunk = containerCommandRequestProto.getWriteChunk();
        RaftServerProxy server = this.ratisServer.getServer();
        Preconditions.checkState(server instanceof RaftServerProxy);
        try {
            if (server.getImpl(this.gid).isLeader()) {
                this.stateMachineDataCache.put(Long.valueOf(j), writeChunk.getData());
            }
            DispatcherContext build = new DispatcherContext.Builder().setTerm(j2).setLogIndex(j).setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).setCreateContainerSet(this.createContainerSet).build();
            CompletableFuture<Message> supplyAsync = CompletableFuture.supplyAsync(() -> {
                return runCommand(containerCommandRequestProto, build);
            }, this.chunkExecutor);
            this.writeChunkFutureMap.put(Long.valueOf(j), supplyAsync);
            LOG.debug("writeChunk writeStateMachineData : blockId " + writeChunk.getBlockID() + " logIndex " + j + " chunkName " + writeChunk.getChunkData().getChunkName());
            supplyAsync.thenApply(message -> {
                this.writeChunkFutureMap.remove(Long.valueOf(j));
                LOG.debug("writeChunk writeStateMachineData  completed: blockId " + writeChunk.getBlockID() + " logIndex " + j + " chunkName " + writeChunk.getChunkData().getChunkName());
                return message;
            });
            return supplyAsync;
        } catch (IOException e) {
            return completeExceptionally(e);
        }
    }

    public CompletableFuture<Message> writeStateMachineData(RaftProtos.LogEntryProto logEntryProto) {
        try {
            this.metrics.incNumWriteStateMachineOps();
            ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto = getContainerCommandRequestProto(logEntryProto.getStateMachineLogEntry().getLogData());
            ContainerProtos.ContainerCommandRequestProto build = ContainerProtos.ContainerCommandRequestProto.newBuilder(containerCommandRequestProto).setWriteChunk(ContainerProtos.WriteChunkRequestProto.newBuilder(containerCommandRequestProto.getWriteChunk()).setData(getStateMachineData(logEntryProto.getStateMachineLogEntry())).build()).build();
            ContainerProtos.Type cmdType = build.getCmdType();
            switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$hdds$protocol$datanode$proto$ContainerProtos$Type[cmdType.ordinal()]) {
                case 1:
                    return handleWriteChunk(build, logEntryProto.getIndex(), logEntryProto.getTerm());
                default:
                    throw new IllegalStateException("Cmd Type:" + cmdType + " should not have state machine data");
            }
        } catch (IOException e) {
            this.metrics.incNumWriteStateMachineFails();
            return completeExceptionally(e);
        }
    }

    public CompletableFuture<Message> query(Message message) {
        try {
            this.metrics.incNumReadStateMachineOps();
            return CompletableFuture.completedFuture(runCommand(getContainerCommandRequestProto(message.getContent()), null));
        } catch (IOException e) {
            this.metrics.incNumReadStateMachineFails();
            return completeExceptionally(e);
        }
    }

    private ByteString readStateMachineData(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto, long j, long j2) throws IOException {
        ContainerProtos.WriteChunkRequestProto writeChunk = containerCommandRequestProto.getWriteChunk();
        ContainerProtos.ChunkInfo chunkData = writeChunk.getChunkData();
        ByteString data = dispatchCommand(ContainerProtos.ContainerCommandRequestProto.newBuilder(containerCommandRequestProto).setCmdType(ContainerProtos.Type.ReadChunk).setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder().setBlockID(writeChunk.getBlockID()).setChunkData(chunkData)).build(), new DispatcherContext.Builder().setTerm(j).setLogIndex(j2).setReadFromTmpFile(true).build()).getReadChunk().getData();
        Preconditions.checkNotNull(data, "read chunk data is null for chunk:" + chunkData);
        Preconditions.checkState(((long) data.size()) == chunkData.getLen(), String.format("read chunk len=%d does not match chunk expected len=%d for chunk:%s", Integer.valueOf(data.size()), Long.valueOf(chunkData.getLen()), chunkData));
        return data;
    }

    private ByteString getCachedStateMachineData(Long l, long j, ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto) throws ExecutionException {
        return (ByteString) this.stateMachineDataCache.get(l, () -> {
            return readStateMachineData(containerCommandRequestProto, j, l.longValue());
        });
    }

    public CompletableFuture<Void> flushStateMachineData(long j) {
        List list = (List) this.writeChunkFutureMap.entrySet().stream().filter(entry -> {
            return ((Long) entry.getKey()).longValue() <= j;
        }).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList());
        return CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[list.size()]));
    }

    public CompletableFuture<ByteString> readStateMachineData(RaftProtos.LogEntryProto logEntryProto) {
        if (!getStateMachineData(logEntryProto.getStateMachineLogEntry()).isEmpty()) {
            return CompletableFuture.completedFuture(ByteString.EMPTY);
        }
        try {
            ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto = getContainerCommandRequestProto(logEntryProto.getStateMachineLogEntry().getLogData());
            Preconditions.checkArgument(!HddsUtils.isReadOnly(containerCommandRequestProto));
            if (containerCommandRequestProto.getCmdType() != ContainerProtos.Type.WriteChunk) {
                throw new IllegalStateException("Cmd type:" + containerCommandRequestProto.getCmdType() + " cannot have state machine data");
            }
            CompletableFuture<ByteString> completableFuture = new CompletableFuture<>();
            CompletableFuture.supplyAsync(() -> {
                try {
                    completableFuture.complete(getCachedStateMachineData(Long.valueOf(logEntryProto.getIndex()), logEntryProto.getTerm(), containerCommandRequestProto));
                } catch (ExecutionException e) {
                    completableFuture.completeExceptionally(e);
                }
                return completableFuture;
            }, this.chunkExecutor);
            return completableFuture;
        } catch (Exception e) {
            LOG.error("unable to read stateMachineData:" + e);
            return completeExceptionally(e);
        }
    }

    private void updateLastApplied() {
        Long l = null;
        long j = -1;
        long index = getLastAppliedTermIndex().getIndex();
        while (true) {
            long j2 = index + 1;
            Long remove = this.applyTransactionCompletionMap.remove(Long.valueOf(j2));
            if (remove == null) {
                break;
            }
            l = remove;
            j = j2;
            index = j2;
        }
        if (l != null) {
            updateLastAppliedTermIndex(l.longValue(), j);
        }
    }

    public void notifyIndexUpdate(long j, long j2) {
        this.applyTransactionCompletionMap.put(Long.valueOf(j2), Long.valueOf(j));
    }

    public CompletableFuture<Message> applyTransaction(TransactionContext transactionContext) {
        long index = transactionContext.getLogEntry().getIndex();
        DispatcherContext.Builder logIndex = new DispatcherContext.Builder().setTerm(transactionContext.getLogEntry().getTerm()).setLogIndex(index);
        try {
            this.metrics.incNumApplyTransactionsOps();
            ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto = getContainerCommandRequestProto(transactionContext.getStateMachineLogEntry().getLogData());
            ContainerProtos.Type cmdType = containerCommandRequestProto.getCmdType();
            if (cmdType == ContainerProtos.Type.WriteChunk) {
                Preconditions.checkArgument(containerCommandRequestProto.getWriteChunk().getData().isEmpty());
                logIndex.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
            }
            if (cmdType == ContainerProtos.Type.WriteChunk || cmdType == ContainerProtos.Type.PutSmallFile) {
                logIndex.setCreateContainerSet(this.createContainerSet);
            }
            CompletableFuture<Message> supplyAsync = CompletableFuture.supplyAsync(() -> {
                return runCommand(containerCommandRequestProto, logIndex.build());
            }, getCommandExecutor(containerCommandRequestProto));
            supplyAsync.thenAccept(message -> {
                Preconditions.checkState(this.applyTransactionCompletionMap.put(Long.valueOf(index), Long.valueOf(transactionContext.getLogEntry().getTerm())) == null);
                updateLastApplied();
            });
            return supplyAsync;
        } catch (IOException e) {
            this.metrics.incNumApplyTransactionsFails();
            return completeExceptionally(e);
        }
    }

    private static <T> CompletableFuture<T> completeExceptionally(Exception exc) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(exc);
        return completableFuture;
    }

    private void evictStateMachineCache() {
        this.stateMachineDataCache.invalidateAll();
        this.stateMachineDataCache.cleanUp();
    }

    public void notifySlowness(RaftGroup raftGroup, RaftProtos.RoleInfoProto roleInfoProto) {
        this.ratisServer.handleNodeSlowness(raftGroup, roleInfoProto);
    }

    public void notifyExtendedNoLeader(RaftGroup raftGroup, RaftProtos.RoleInfoProto roleInfoProto) {
        this.ratisServer.handleNoLeader(raftGroup, roleInfoProto);
    }

    public void notifyNotLeader(Collection<TransactionContext> collection) throws IOException {
        evictStateMachineCache();
    }

    public void close() throws IOException {
        evictStateMachineCache();
    }
}
