package org.apache.ratis.grpc.client;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.client.GrpcClientProtocolProxy;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ratis/grpc/client/GrpcClientStreamer.class */
public class GrpcClientStreamer implements Closeable {
    public static final Logger LOG = LoggerFactory.getLogger(GrpcClientStreamer.class);
    private final Deque<RaftProtos.RaftClientRequestProto> dataQueue;
    private final Deque<RaftProtos.RaftClientRequestProto> ackQueue;
    private final int maxPendingNum;
    private final SizeInBytes maxMessageSize;
    private final PeerProxyMap<GrpcClientProtocolProxy> proxyMap;
    private final Map<RaftPeerId, RaftPeer> peers;
    private RaftPeerId leaderId;
    private volatile GrpcClientProtocolProxy leaderProxy;
    private final ClientId clientId;
    private volatile RunningState running = RunningState.RUNNING;
    private final ExceptionAndRetry exceptionAndRetry;
    private final Sender senderThread;
    private final RaftGroupId groupId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ratis/grpc/client/GrpcClientStreamer$ExceptionAndRetry.class */
    public static class ExceptionAndRetry {
        private final Map<RaftPeerId, IOException> exceptionMap = new HashMap();
        private final AtomicInteger retryTimes = new AtomicInteger(0);
        private final int maxRetryTimes;
        private final TimeDuration retryInterval;

        ExceptionAndRetry(RaftProperties raftProperties) {
            this.maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(raftProperties);
            this.retryInterval = GrpcConfigKeys.OutputStream.retryInterval(raftProperties);
        }

        void addException(RaftPeerId raftPeerId, IOException iOException) {
            this.exceptionMap.put(raftPeerId, iOException);
            this.retryTimes.incrementAndGet();
        }

        IOException getCombinedException() {
            return new IOException("Exceptions: " + this.exceptionMap);
        }

        boolean shouldRetry() {
            return this.retryTimes.get() <= this.maxRetryTimes;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ratis/grpc/client/GrpcClientStreamer$ResponseHandler.class */
    public class ResponseHandler implements GrpcClientProtocolProxy.CloseableStreamObserver {
        private final RaftPeerId targetId;
        private volatile boolean active = true;

        ResponseHandler(RaftPeer raftPeer) {
            this.targetId = raftPeer.getId();
        }

        public String toString() {
            return GrpcClientStreamer.this + "-ResponseHandler-" + this.targetId;
        }

        public void onNext(RaftProtos.RaftClientReplyProto raftClientReplyProto) {
            if (this.active) {
                synchronized (GrpcClientStreamer.this) {
                    RaftProtos.RaftClientRequestProto raftClientRequestProto = (RaftProtos.RaftClientRequestProto) Objects.requireNonNull(GrpcClientStreamer.this.ackQueue.peek());
                    if (raftClientReplyProto.getRpcReply().getSuccess()) {
                        Preconditions.assertTrue(raftClientRequestProto.getRpcRequest().getCallId() == raftClientReplyProto.getRpcReply().getCallId(), () -> {
                            return "pending=" + ClientProtoUtils.toString(raftClientRequestProto) + " but reply=" + ClientProtoUtils.toString(raftClientReplyProto);
                        });
                        GrpcClientStreamer.this.ackQueue.poll();
                        if (GrpcClientStreamer.LOG.isTraceEnabled()) {
                            GrpcClientStreamer.LOG.trace("{} received success ack for {}", this, ClientProtoUtils.toString(raftClientRequestProto));
                        }
                        if (GrpcClientStreamer.this.running == RunningState.LOOK_FOR_LEADER) {
                            GrpcClientStreamer.this.running = RunningState.RUNNING;
                        }
                    } else {
                        RaftClientReply raftClientReply = ClientProtoUtils.toRaftClientReply(raftClientReplyProto);
                        NotLeaderException notLeaderException = raftClientReply.getNotLeaderException();
                        if (notLeaderException != null) {
                            GrpcClientStreamer.LOG.debug("{} received a NotLeaderException from {}", this, raftClientReply.getServerId());
                            GrpcClientStreamer.this.handleNotLeader(notLeaderException, this.targetId);
                        }
                    }
                    GrpcClientStreamer.this.notifyAll();
                }
            }
        }

        public void onError(Throwable th) {
            GrpcClientStreamer.LOG.warn(this + " onError", th);
            if (this.active) {
                synchronized (GrpcClientStreamer.this) {
                    GrpcClientStreamer.this.handleError(th, this);
                    GrpcClientStreamer.this.notifyAll();
                }
            }
        }

        public void onCompleted() {
            GrpcClientStreamer.LOG.info("{} onCompleted, pending requests #: {}", this, Integer.valueOf(GrpcClientStreamer.this.ackQueue.size()));
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.active = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/grpc/client/GrpcClientStreamer$RunningState.class */
    public enum RunningState {
        RUNNING,
        LOOK_FOR_LEADER,
        CLOSED,
        ERROR
    }

    /* loaded from: input_file:org/apache/ratis/grpc/client/GrpcClientStreamer$Sender.class */
    private class Sender extends Daemon {
        private Sender() {
        }

        public void run() {
            while (GrpcClientStreamer.this.isRunning()) {
                synchronized (GrpcClientStreamer.this) {
                    while (GrpcClientStreamer.this.isRunning() && shouldWait()) {
                        try {
                            GrpcClientStreamer.this.wait();
                        } catch (InterruptedException e) {
                        }
                    }
                    if (GrpcClientStreamer.this.running == RunningState.RUNNING) {
                        Preconditions.assertTrue(!GrpcClientStreamer.this.dataQueue.isEmpty(), "dataQueue is empty");
                        RaftProtos.RaftClientRequestProto raftClientRequestProto = (RaftProtos.RaftClientRequestProto) GrpcClientStreamer.this.dataQueue.poll();
                        GrpcClientStreamer.this.leaderProxy.onNext(raftClientRequestProto);
                        GrpcClientStreamer.this.ackQueue.offer(raftClientRequestProto);
                    }
                }
            }
        }

        private boolean shouldWait() {
            return GrpcClientStreamer.this.dataQueue.isEmpty() || GrpcClientStreamer.this.ackQueue.size() >= GrpcClientStreamer.this.maxPendingNum || GrpcClientStreamer.this.running == RunningState.LOOK_FOR_LEADER;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GrpcClientStreamer(RaftProperties raftProperties, RaftGroup raftGroup, RaftPeerId raftPeerId, ClientId clientId, GrpcTlsConfig grpcTlsConfig) {
        this.clientId = clientId;
        this.maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(raftProperties);
        Logger logger = LOG;
        logger.getClass();
        this.maxMessageSize = GrpcConfigKeys.messageSizeMax(raftProperties, logger::debug);
        this.dataQueue = new ConcurrentLinkedDeque();
        this.ackQueue = new ConcurrentLinkedDeque();
        this.exceptionAndRetry = new ExceptionAndRetry(raftProperties);
        this.groupId = raftGroup.getGroupId();
        this.peers = (Map) raftGroup.getPeers().stream().collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, Function.identity()));
        this.proxyMap = new PeerProxyMap<>(clientId.toString(), raftPeer -> {
            return new GrpcClientProtocolProxy(clientId, raftPeer, raftPeer -> {
                return new ResponseHandler(raftPeer);
            }, raftProperties, grpcTlsConfig);
        });
        this.proxyMap.addPeers(raftGroup.getPeers());
        refreshLeaderProxy(raftPeerId, null);
        this.senderThread = new Sender();
        this.senderThread.setName(toString() + "-sender");
        this.senderThread.start();
    }

    private synchronized void refreshLeaderProxy(RaftPeerId raftPeerId, RaftPeerId raftPeerId2) {
        if (raftPeerId != null) {
            this.leaderId = raftPeerId;
        } else if (raftPeerId2 == null) {
            this.leaderId = this.peers.keySet().iterator().next();
        } else {
            this.leaderId = (RaftPeerId) CollectionUtils.random(raftPeerId2, this.peers.keySet());
            if (this.leaderId == null) {
                this.leaderId = raftPeerId2;
            }
        }
        LOG.debug("{} switches leader from {} to {}. suggested leader: {}", new Object[]{this, raftPeerId2, this.leaderId, raftPeerId});
        if (this.leaderProxy != null) {
            this.leaderProxy.closeCurrentSession();
        }
        try {
            this.leaderProxy = (GrpcClientProtocolProxy) this.proxyMap.getProxy(this.leaderId);
        } catch (IOException e) {
            LOG.error("Should not hit IOException here", e);
            refreshLeader(null, this.leaderId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isRunning() {
        return this.running == RunningState.RUNNING || this.running == RunningState.LOOK_FOR_LEADER;
    }

    private void checkState() throws IOException {
        if (isRunning()) {
            return;
        }
        throwException("The GrpcClientStreamer has been closed");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void write(ByteString byteString, long j) throws IOException {
        checkState();
        while (isRunning() && this.dataQueue.size() >= this.maxPendingNum) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        if (!isRunning()) {
            throwException(this + " got closed.");
            return;
        }
        RaftProtos.RaftClientRequestProto raftClientRequestProto = ClientProtoUtils.toRaftClientRequestProto(this.clientId, this.leaderId, this.groupId, j, j, byteString);
        if (raftClientRequestProto.getSerializedSize() > this.maxMessageSize.getSizeInt()) {
            throw new IOException("msg size:" + raftClientRequestProto.getSerializedSize() + " exceeds maximum:" + this.maxMessageSize.getSizeInt());
        }
        this.dataQueue.offer(raftClientRequestProto);
        notifyAll();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void flush() throws IOException {
        checkState();
        if (this.dataQueue.isEmpty() && this.ackQueue.isEmpty()) {
            return;
        }
        while (isRunning() && (!this.dataQueue.isEmpty() || !this.ackQueue.isEmpty())) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        if (isRunning()) {
            return;
        }
        if (this.dataQueue.isEmpty() && this.ackQueue.isEmpty()) {
            return;
        }
        throwException(this + " got closed before finishing flush");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (isRunning()) {
            flush();
            this.running = RunningState.CLOSED;
            this.senderThread.interrupt();
            try {
                this.senderThread.join();
            } catch (InterruptedException e) {
            }
            this.proxyMap.close();
        }
    }

    public String toString() {
        return getClass().getSimpleName() + "-" + this.clientId;
    }

    private void throwException(String str) throws IOException {
        if (this.running != RunningState.ERROR) {
            throw new IOException(str);
        }
        throw this.exceptionAndRetry.getCombinedException();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleNotLeader(NotLeaderException notLeaderException, RaftPeerId raftPeerId) {
        Preconditions.assertTrue(Thread.holdsLock(this));
        refreshPeers(notLeaderException.getPeers());
        refreshLeader(notLeaderException.getSuggestedLeader().getId(), raftPeerId);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(Throwable th, ResponseHandler responseHandler) {
        Preconditions.assertTrue(Thread.holdsLock(this));
        IOException unwrapIOException = GrpcUtil.unwrapIOException(th);
        this.exceptionAndRetry.addException(responseHandler.targetId, unwrapIOException);
        LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.", new Object[]{responseHandler, unwrapIOException, Integer.valueOf(this.exceptionAndRetry.retryTimes.get()), Integer.valueOf(this.exceptionAndRetry.maxRetryTimes)});
        this.leaderProxy.onError();
        if (this.exceptionAndRetry.shouldRetry()) {
            refreshLeader(null, this.leaderId);
        } else {
            this.running = RunningState.ERROR;
        }
    }

    private void refreshLeader(RaftPeerId raftPeerId, RaftPeerId raftPeerId2) {
        this.running = RunningState.LOOK_FOR_LEADER;
        refreshLeaderProxy(raftPeerId, raftPeerId2);
        reQueuePendingRequests(this.leaderId);
        RaftProtos.RaftClientRequestProto raftClientRequestProto = (RaftProtos.RaftClientRequestProto) Objects.requireNonNull(this.dataQueue.poll());
        this.ackQueue.offer(raftClientRequestProto);
        try {
            this.exceptionAndRetry.retryInterval.sleep();
        } catch (InterruptedException e) {
        }
        this.leaderProxy.onNext(raftClientRequestProto);
    }

    private void reQueuePendingRequests(RaftPeerId raftPeerId) {
        if (isRunning()) {
            while (!this.ackQueue.isEmpty()) {
                RaftProtos.RaftClientRequestProto pollLast = this.ackQueue.pollLast();
                this.dataQueue.offerFirst(RaftProtos.RaftClientRequestProto.newBuilder(pollLast).setRpcRequest(RaftProtos.RaftRpcRequestProto.newBuilder(pollLast.getRpcRequest()).setReplyId(raftPeerId.toByteString())).build());
            }
        }
    }

    private void refreshPeers(RaftPeer[] raftPeerArr) {
        if (raftPeerArr == null || raftPeerArr.length <= 0) {
            return;
        }
        Arrays.stream(raftPeerArr).forEach(raftPeer -> {
            this.peers.putIfAbsent(raftPeer.getId(), raftPeer);
            this.proxyMap.computeIfAbsent(raftPeer);
        });
        LOG.debug("refreshed peers: {}", this.peers);
    }
}
