package com.netease.arctic.flink.read;

import com.netease.arctic.flink.read.internals.KafkaFetcher;
import com.netease.arctic.flink.shuffle.LogRecordV1;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.log.LogData;
import com.netease.arctic.log.LogDataJsonDeserialization;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.Handover;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.iceberg.Schema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/read/LogKafkaFetcher.class */
public class LogKafkaFetcher extends KafkaFetcher<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(LogKafkaFetcher.class);
    private final LogDataJsonDeserialization<RowData> logDataJsonDeserialization;
    private final LogReadHelper logReadHelper;
    private final boolean logRetractionEnable;
    LogKafkaConsumerThread<RowData> logKafkaConsumerThread;
    private final int subtaskId;
    private final boolean logConsumerAppendOnly;

    public LogKafkaFetcher(SourceFunction.SourceContext<RowData> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<RowData>> serializedValue, ProcessingTimeService processingTimeService, long j, ClassLoader classLoader, String str, KafkaDeserializationSchemaWrapper<RowData> kafkaDeserializationSchemaWrapper, Properties properties, long j2, MetricGroup metricGroup, MetricGroup metricGroup2, boolean z, Schema schema, boolean z2, LogReadHelper logReadHelper, Handover handover, ClosableBlockingQueue<KafkaTopicPartitionState<RowData, TopicPartition>> closableBlockingQueue, LogKafkaConsumerThread<RowData> logKafkaConsumerThread, int i, String str2) throws Exception {
        super(sourceContext, map, serializedValue, processingTimeService, j, classLoader, str, kafkaDeserializationSchemaWrapper, properties, j2, metricGroup, metricGroup2, z, handover, logKafkaConsumerThread, closableBlockingQueue);
        this.logDataJsonDeserialization = new LogDataJsonDeserialization<>(schema, LogRecordV1.factory, LogRecordV1.arrayFactory, LogRecordV1.mapFactory);
        this.logRetractionEnable = z2;
        this.logReadHelper = logReadHelper;
        this.logKafkaConsumerThread = logKafkaConsumerThread;
        this.subtaskId = i;
        this.logConsumerAppendOnly = ArcticValidator.LOG_CONSUMER_CHANGELOG_MODE_APPEND_ONLY.equalsIgnoreCase(str2);
    }

    @Override // com.netease.arctic.flink.read.internals.KafkaFetcher
    public void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> list, KafkaTopicPartitionState<RowData, TopicPartition> kafkaTopicPartitionState) throws Exception {
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            if (!checkMagicNum((byte[]) consumerRecord.value())) {
                throw new UnsupportedOperationException("Can't deserialize arctic log queue message due to it does not contain magic number.");
            }
            LogData<RowData> deserialize = this.logDataJsonDeserialization.deserialize((byte[]) consumerRecord.value());
            if (deserialize.getFlip() || !filterByRowKind((RowData) deserialize.getActualValue())) {
                int partition = consumerRecord.partition();
                long offset = consumerRecord.offset();
                if (filterBuffer(deserialize, partition, offset)) {
                    synchronized (this.checkpointLock) {
                        kafkaTopicPartitionState.setOffset(offset);
                    }
                } else {
                    processMsg(consumerRecord, deserialize, kafkaTopicPartitionState);
                }
            } else {
                LOG.info("filter the rowData, because of logConsumerAppendOnly is true, and rowData={}.", deserialize.getActualValue());
            }
        }
    }

    boolean filterByRowKind(RowData rowData) {
        return (this.logRetractionEnable || !this.logConsumerAppendOnly || rowData.getRowKind().equals(RowKind.INSERT)) ? false : true;
    }

    private boolean filterBuffer(LogData<RowData> logData, int i, long j) {
        String upstreamId = logData.getUpstreamId();
        if (!this.logReadHelper.getCleanBufferAction(upstreamId, i) || !this.logReadHelper.isJobRetractingRightNow(upstreamId, i)) {
            return false;
        }
        if (j > this.logReadHelper.queryPartitionRetractingOffset(logData.getUpstreamId(), i)) {
            return true;
        }
        LOG.info("The fetcher has finished to clean buffer records.");
        this.logReadHelper.cleanBufferAction(upstreamId, i, false);
        return false;
    }

    private void processMsg(ConsumerRecord<byte[], byte[]> consumerRecord, LogData<RowData> logData, KafkaTopicPartitionState<RowData, TopicPartition> kafkaTopicPartitionState) throws IOException {
        if (!logData.getFlip()) {
            handleUnFlip(logData, consumerRecord, kafkaTopicPartitionState);
        } else {
            LOG.info("subtaskId={}, fetch a flip msg, flip offset= {} with partition= {}, data={}.", new Object[]{Integer.valueOf(this.subtaskId), Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()), logData});
            handleFlip(logData, consumerRecord, kafkaTopicPartitionState);
        }
    }

    private void handleUnFlip(LogData<RowData> logData, ConsumerRecord<byte[], byte[]> consumerRecord, KafkaTopicPartitionState<RowData, TopicPartition> kafkaTopicPartitionState) {
        RowData rowData = (RowData) logData.getActualValue();
        if (this.logRetractionEnable) {
            handleUnFlipWithRetraction(logData, consumerRecord, kafkaTopicPartitionState);
        } else {
            emitRecordWithTimestampsWithoutConsistency(rowData, kafkaTopicPartitionState, consumerRecord.offset(), consumerRecord.timestamp());
        }
    }

    private void handleUnFlipWithRetraction(LogData<RowData> logData, ConsumerRecord<byte[], byte[]> consumerRecord, KafkaTopicPartitionState<RowData, TopicPartition> kafkaTopicPartitionState) {
        RowData rowData = (RowData) logData.getActualValue();
        String upstreamId = logData.getUpstreamId();
        int partition = consumerRecord.partition();
        boolean isJobRetractingRightNow = this.logReadHelper.isJobRetractingRightNow(upstreamId, partition);
        if (isJobRetractingRightNow) {
            if (logData.getEpicNo() <= this.logReadHelper.queryRetractingEpicNo(upstreamId, partition)) {
                LOG.info("although the consumer is retracting status, however the LogData is not required to turn ChangeAction, {}.", rowData);
                return;
            }
            rowData = this.logReadHelper.turnRowKind(rowData);
        }
        emitRecordWithTimestampsWithConsistency(logData, partition, rowData, kafkaTopicPartitionState, consumerRecord.offset(), consumerRecord.timestamp(), isJobRetractingRightNow);
    }

    private void handleFlip(LogData<RowData> logData, ConsumerRecord<byte[], byte[]> consumerRecord, KafkaTopicPartitionState<RowData, TopicPartition> kafkaTopicPartitionState) {
        String upstreamId = logData.getUpstreamId();
        long epicNo = logData.getEpicNo();
        int partition = consumerRecord.partition();
        if (!this.logRetractionEnable) {
            LOG.info("subtaskId={}, receive a flip msg, while {} is false, so ignore this flip.", Integer.valueOf(this.subtaskId), ArcticValidator.ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE.key());
            return;
        }
        if (this.logReadHelper.isJobRetractingRightNow(upstreamId, partition)) {
            long offset = consumerRecord.offset();
            synchronized (this.checkpointLock) {
                kafkaTopicPartitionState.setOffset(offset);
                this.logReadHelper.suspendRetracting(upstreamId, epicNo, partition, offset);
            }
            return;
        }
        Optional<Long> epicOffset = this.logReadHelper.getEpicOffset(upstreamId, epicNo, partition);
        if (!epicOffset.isPresent()) {
            LOG.warn("could not find out seek offset by upstreamId={}, epicNo={}, partition={}.", new Object[]{upstreamId, Long.valueOf(epicNo), Integer.valueOf(partition)});
            return;
        }
        long longValue = epicOffset.get().longValue();
        this.logReadHelper.cleanBufferAction(upstreamId, partition, true);
        synchronized (this.checkpointLock) {
            kafkaTopicPartitionState.setOffset(longValue);
            this.logReadHelper.markEpicPartitionRetracting(upstreamId, epicNo, partition, Long.valueOf(longValue));
        }
        String str = consumerRecord.topic();
        KafkaTopicPartitionState<RowData, TopicPartition> kafkaTopicPartitionState2 = new KafkaTopicPartitionState<>(new KafkaTopicPartition(str, partition), new TopicPartition(str, partition));
        kafkaTopicPartitionState2.setOffset(longValue);
        this.logKafkaConsumerThread.setTopicPartitionOffset(kafkaTopicPartitionState2);
    }

    public static boolean checkMagicNum(byte[] bArr) {
        Preconditions.checkNotNull(bArr);
        Preconditions.checkArgument(bArr.length >= 3);
        return bArr[0] == LogData.MAGIC_NUMBER[0] && bArr[1] == LogData.MAGIC_NUMBER[1] && bArr[2] == LogData.MAGIC_NUMBER[2];
    }

    protected void emitRecordWithTimestampsWithConsistency(LogData<RowData> logData, int i, RowData rowData, KafkaTopicPartitionState<RowData, TopicPartition> kafkaTopicPartitionState, long j, long j2, boolean z) {
        String upstreamId = logData.getUpstreamId();
        long epicNo = logData.getEpicNo();
        synchronized (this.checkpointLock) {
            if (z) {
                this.logReadHelper.updateRetractingEpicOffset(upstreamId, epicNo, i, j);
            } else {
                this.logReadHelper.updateEpicStartOffsetIfEmpty(upstreamId, epicNo, i, j);
            }
            emitRecordWithTimestamps(rowData, kafkaTopicPartitionState, j, j2);
        }
    }

    protected void emitRecordWithTimestampsWithoutConsistency(RowData rowData, KafkaTopicPartitionState<RowData, TopicPartition> kafkaTopicPartitionState, long j, long j2) {
        synchronized (this.checkpointLock) {
            emitRecordWithTimestamps(rowData, kafkaTopicPartitionState, j, j2);
        }
    }

    protected void emitRecordWithTimestamps(RowData rowData, KafkaTopicPartitionState<RowData, TopicPartition> kafkaTopicPartitionState, long j, long j2) {
        setKafkaLatency(j2);
        long extractTimestamp = kafkaTopicPartitionState.extractTimestamp(rowData, j2);
        this.sourceContext.collectWithTimestamp(rowData, extractTimestamp);
        kafkaTopicPartitionState.onEvent(rowData, extractTimestamp);
        kafkaTopicPartitionState.setOffset(j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.netease.arctic.flink.read.internals.AbstractFetcher
    public void emitRecordsWithTimestamps(Queue<RowData> queue, KafkaTopicPartitionState<RowData, TopicPartition> kafkaTopicPartitionState, long j, long j2) {
        synchronized (this.checkpointLock) {
            setKafkaLatency(j2);
            while (true) {
                RowData poll = queue.poll();
                if (poll != null) {
                    long extractTimestamp = kafkaTopicPartitionState.extractTimestamp(poll, j2);
                    this.sourceContext.collectWithTimestamp(poll, extractTimestamp);
                    kafkaTopicPartitionState.onEvent(poll, extractTimestamp);
                } else {
                    kafkaTopicPartitionState.setOffset(j);
                }
            }
        }
    }
}
