package com.netease.arctic.flink.read;

import com.netease.arctic.flink.read.internals.AbstractFetcher;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.Handover;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.SerializedValue;
import org.apache.iceberg.Schema;

/* loaded from: input_file:com/netease/arctic/flink/read/LogKafkaConsumer.class */
public class LogKafkaConsumer extends FlinkKafkaConsumer<RowData> {
    private static final long serialVersionUID = 7855676094345921722L;
    private KafkaDeserializationSchemaWrapper<RowData> logRecordDeserializationSchemaWrapper;
    private final Schema schema;
    private final boolean logRetractionEnable;
    private final LogReadHelper logReadHelper;
    private int subtaskId;
    private final String logConsumerChangelogMode;

    public LogKafkaConsumer(List<String> list, KafkaDeserializationSchemaWrapper<RowData> kafkaDeserializationSchemaWrapper, Properties properties, Schema schema, ReadableConfig readableConfig) {
        super(list, (KafkaDeserializationSchema) kafkaDeserializationSchemaWrapper, properties);
        this.logRecordDeserializationSchemaWrapper = kafkaDeserializationSchemaWrapper;
        this.schema = schema;
        this.logRetractionEnable = CompatibleFlinkPropertyUtil.propertyAsBoolean(readableConfig, ArcticValidator.ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE);
        this.logConsumerChangelogMode = (String) readableConfig.get(ArcticValidator.ARCTIC_LOG_CONSUMER_CHANGELOG_MODE);
        this.logReadHelper = new LogReadHelper();
    }

    public LogKafkaConsumer(Pattern pattern, KafkaDeserializationSchemaWrapper<RowData> kafkaDeserializationSchemaWrapper, Properties properties, Schema schema, ReadableConfig readableConfig) {
        super(pattern, (KafkaDeserializationSchema) kafkaDeserializationSchemaWrapper, properties);
        this.schema = schema;
        this.logRetractionEnable = CompatibleFlinkPropertyUtil.propertyAsBoolean(readableConfig, ArcticValidator.ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE);
        this.logConsumerChangelogMode = (String) readableConfig.get(ArcticValidator.ARCTIC_LOG_CONSUMER_CHANGELOG_MODE);
        this.logReadHelper = new LogReadHelper();
    }

    @Override // com.netease.arctic.flink.read.FlinkKafkaConsumerBase
    public final void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        super.initializeState(functionInitializationContext);
        this.logReadHelper.initializeState(functionInitializationContext, getRuntimeContext());
        LOG.info("initialize KafkaConsumer, subtaskId={}, {}={}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), ArcticValidator.ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE.key(), Boolean.valueOf(this.logRetractionEnable)});
        this.subtaskId = getRuntimeContext().getIndexOfThisSubtask();
    }

    @Override // com.netease.arctic.flink.read.FlinkKafkaConsumerBase
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        super.snapshotState(functionSnapshotContext);
        this.logReadHelper.snapshotState();
    }

    @Override // com.netease.arctic.flink.read.FlinkKafkaConsumer, com.netease.arctic.flink.read.FlinkKafkaConsumerBase
    protected AbstractFetcher<RowData, ?> createFetcher(SourceFunction.SourceContext<RowData> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<RowData>> serializedValue, StreamingRuntimeContext streamingRuntimeContext, OffsetCommitMode offsetCommitMode, MetricGroup metricGroup, boolean z) throws Exception {
        adjustAutoCommitConfig(this.properties, offsetCommitMode);
        String taskNameWithSubtasks = streamingRuntimeContext.getTaskNameWithSubtasks();
        MetricGroup metricGroup2 = streamingRuntimeContext.getMetricGroup();
        Handover handover = new Handover();
        ClosableBlockingQueue closableBlockingQueue = new ClosableBlockingQueue();
        return new LogKafkaFetcher(sourceContext, map, serializedValue, streamingRuntimeContext.getProcessingTimeService(), streamingRuntimeContext.getExecutionConfig().getAutoWatermarkInterval(), streamingRuntimeContext.getUserCodeClassLoader(), taskNameWithSubtasks, this.logRecordDeserializationSchemaWrapper, this.properties, this.pollTimeout, streamingRuntimeContext.getMetricGroup(), metricGroup, z, this.schema, this.logRetractionEnable, this.logReadHelper, handover, closableBlockingQueue, new LogKafkaConsumerThread(LOG, handover, this.properties, closableBlockingQueue, "Kafka Fetcher for " + taskNameWithSubtasks, this.pollTimeout, z, metricGroup, metricGroup2), this.subtaskId, this.logConsumerChangelogMode);
    }
}
