package com.netease.arctic.flink.write.hidden;

import com.netease.arctic.flink.shuffle.ShuffleHelper;
import com.netease.arctic.flink.write.hidden.LogMsgFactory;
import com.netease.arctic.log.LogData;
import com.netease.arctic.log.LogDataJsonSerialization;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.NavigableMap;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/write/hidden/GlobalFlipCommitter.class */
public class GlobalFlipCommitter {
    private static final Logger LOG = LoggerFactory.getLogger(GlobalFlipCommitter.class);
    private static final String AGGREGATE_NAME = "flip-committer";
    private final GlobalAggregateManager aggregateManager;
    private final FlipCommitFunction flipCommitFunction;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/netease/arctic/flink/write/hidden/GlobalFlipCommitter$CommitRequest.class */
    public static class CommitRequest implements Serializable {
        private static final long serialVersionUID = 5469815741394678192L;
        private final Integer subtaskId;
        private final LogData<RowData> logRecord;
        private final boolean checkCommitted;

        private CommitRequest(Integer num, LogData<RowData> logData) {
            this.subtaskId = num;
            this.logRecord = logData;
            this.checkCommitted = false;
        }

        private CommitRequest(Integer num, LogData<RowData> logData, Boolean bool) {
            this.subtaskId = num;
            this.logRecord = logData;
            this.checkCommitted = bool.booleanValue();
        }

        public String toString() {
            return "CommitRequest{subtaskId=" + this.subtaskId + ", flip message=" + this.logRecord.toString() + "}";
        }
    }

    /* loaded from: input_file:com/netease/arctic/flink/write/hidden/GlobalFlipCommitter$FlipCommitFunction.class */
    static class FlipCommitFunction implements AggregateFunction<CommitRequest, LogGlobalState, Long> {
        private static final long serialVersionUID = 6399278898504357412L;
        private final int numberOfTasks;
        private final LogDataJsonSerialization<RowData> logDataJsonSerialization;
        private final LogMsgFactory<RowData> factory;
        private final Properties producerConfig;
        private final String topic;
        private final ShuffleHelper helper;
        private transient LogMsgFactory.Producer<RowData> producer;

        public FlipCommitFunction(int i, Schema schema, LogData.FieldGetterFactory<RowData> fieldGetterFactory, LogMsgFactory<RowData> logMsgFactory, Properties properties, String str, ShuffleHelper shuffleHelper) {
            this.numberOfTasks = i;
            this.factory = (LogMsgFactory) Preconditions.checkNotNull(logMsgFactory);
            this.logDataJsonSerialization = new LogDataJsonSerialization<>((Schema) Preconditions.checkNotNull(schema), (LogData.FieldGetterFactory) Preconditions.checkNotNull(fieldGetterFactory));
            this.producerConfig = properties;
            this.topic = str;
            this.helper = shuffleHelper;
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public LogGlobalState m74createAccumulator() {
            return new LogGlobalState();
        }

        public LogGlobalState add(CommitRequest commitRequest, LogGlobalState logGlobalState) {
            if (commitRequest.checkCommitted) {
                return logGlobalState;
            }
            GlobalFlipCommitter.LOG.info("receive CommitRequest={}.", commitRequest);
            NavigableMap navigableMap = logGlobalState.accumulators;
            Long valueOf = Long.valueOf(commitRequest.logRecord.getEpicNo());
            navigableMap.compute(valueOf, (l, subAccumulator) -> {
                SubAccumulator subAccumulator = subAccumulator == null ? new SubAccumulator() : subAccumulator;
                if (!subAccumulator.hasCommittedFlip) {
                    subAccumulator.add(commitRequest.subtaskId.intValue(), commitRequest);
                }
                return subAccumulator;
            });
            SubAccumulator subAccumulator2 = (SubAccumulator) logGlobalState.accumulators.get(valueOf);
            if (subAccumulator2.taskIds.size() == this.numberOfTasks) {
                try {
                    GlobalFlipCommitter.LOG.info("already receive {} commit requests. The last subtask received is {}.", Integer.valueOf(this.numberOfTasks), commitRequest.subtaskId);
                    sendFlip(subAccumulator2, commitRequest);
                    GlobalFlipCommitter.LOG.info("sent flip messages success, cost {}ms.", Long.valueOf(subAccumulator2.cost.time()));
                } catch (Exception e) {
                    GlobalFlipCommitter.LOG.error("sending flip messages to topic failed, subAccumulator:{}.", subAccumulator2, e);
                    throw new RuntimeException(e);
                }
            } else {
                GlobalFlipCommitter.LOG.info("As of now, global state has received a total of {} commit requests which are {}.", Integer.valueOf(subAccumulator2.taskIds.size()), Arrays.toString(subAccumulator2.taskIds.toArray(new Integer[0])));
            }
            return logGlobalState;
        }

        private void sendFlip(SubAccumulator subAccumulator, CommitRequest commitRequest) throws Exception {
            if (null == this.producer) {
                this.producer = this.factory.createProducer(this.producerConfig, this.topic, this.logDataJsonSerialization, this.helper);
                this.producer.open();
            }
            this.producer.sendToAllPartitions(commitRequest.logRecord);
            subAccumulator.committed();
        }

        public Long getResult(LogGlobalState logGlobalState) {
            return (Long) logGlobalState.accumulators.descendingMap().entrySet().stream().filter(entry -> {
                return ((SubAccumulator) entry.getValue()).hasCommittedFlip;
            }).findFirst().map((v0) -> {
                return v0.getKey();
            }).orElse(null);
        }

        public LogGlobalState merge(LogGlobalState logGlobalState, LogGlobalState logGlobalState2) {
            logGlobalState2.accumulators.forEach((l, subAccumulator) -> {
            });
            return logGlobalState;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/netease/arctic/flink/write/hidden/GlobalFlipCommitter$LogGlobalState.class */
    public static class LogGlobalState implements Serializable {
        private static final long serialVersionUID = 9132207718335661833L;
        private final NavigableMap<Long, SubAccumulator> accumulators = new ConcurrentSkipListMap();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netease/arctic/flink/write/hidden/GlobalFlipCommitter$SubAccumulator.class */
    public static class SubAccumulator implements Serializable {
        private static final long serialVersionUID = 1252547231163598559L;
        private final Set<Integer> taskIds;
        private CommitRequest commitRequest;
        private volatile boolean hasCommittedFlip;
        private Cost cost;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:com/netease/arctic/flink/write/hidden/GlobalFlipCommitter$SubAccumulator$Cost.class */
        public static class Cost implements Serializable {
            private static final long serialVersionUID = 1;
            Long start;
            Long end;

            Cost() {
            }

            long time() {
                return this.end.longValue() - this.start.longValue();
            }

            void markStart() {
                if (this.start == null) {
                    this.start = Long.valueOf(System.currentTimeMillis());
                }
            }

            void markEnd() {
                if (this.end == null) {
                    this.end = Long.valueOf(System.currentTimeMillis());
                }
            }
        }

        private SubAccumulator() {
            this.taskIds = new CopyOnWriteArraySet();
            this.commitRequest = null;
            this.hasCommittedFlip = false;
            this.cost = new Cost();
        }

        void add(int i, CommitRequest commitRequest) {
            this.taskIds.add(Integer.valueOf(i));
            if (null == this.commitRequest && null != commitRequest) {
                this.commitRequest = commitRequest;
            }
            this.cost.markStart();
        }

        void committed() {
            this.hasCommittedFlip = true;
            this.cost.markEnd();
        }

        void merge(SubAccumulator subAccumulator) {
            this.taskIds.addAll(subAccumulator.taskIds);
            this.commitRequest = subAccumulator.commitRequest;
        }
    }

    public GlobalFlipCommitter(GlobalAggregateManager globalAggregateManager, FlipCommitFunction flipCommitFunction) {
        this.aggregateManager = globalAggregateManager;
        this.flipCommitFunction = flipCommitFunction;
    }

    public boolean commit(int i, LogData<RowData> logData) throws IOException {
        Long l = (Long) this.aggregateManager.updateGlobalAggregate(AGGREGATE_NAME, new CommitRequest(Integer.valueOf(i), logData), this.flipCommitFunction);
        return l != null && l.longValue() == logData.getEpicNo();
    }

    public boolean hasCommittedFlip(LogData<RowData> logData) throws IOException {
        Long l = (Long) this.aggregateManager.updateGlobalAggregate(AGGREGATE_NAME, new CommitRequest(null, logData, true), this.flipCommitFunction);
        return l != null && l.longValue() == logData.getEpicNo();
    }
}
