package com.ververica.cdc.connectors.base.source.reader;

import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.utils.SourceRecordUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.class */
public class IncrementalSourceRecordEmitter<T> implements RecordEmitter<SourceRecords, T, SourceSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceRecordEmitter.class);
    private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = new FlinkJsonTableChangeSerializer();
    protected final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    protected final SourceReaderMetrics sourceReaderMetrics;
    protected final boolean includeSchemaChanges;
    protected final OutputCollector<T> outputCollector = new OutputCollector<>();
    protected final OffsetFactory offsetFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter$OutputCollector.class */
    public static class OutputCollector<T> implements Collector<T> {
        private SourceOutput<T> output;

        private OutputCollector() {
        }

        public void collect(T t) {
            this.output.collect(t);
        }

        public void close() {
        }
    }

    public IncrementalSourceRecordEmitter(DebeziumDeserializationSchema<T> debeziumDeserializationSchema, SourceReaderMetrics sourceReaderMetrics, boolean z, OffsetFactory offsetFactory) {
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
        this.sourceReaderMetrics = sourceReaderMetrics;
        this.includeSchemaChanges = z;
        this.offsetFactory = offsetFactory;
    }

    public void emitRecord(SourceRecords sourceRecords, SourceOutput<T> sourceOutput, SourceSplitState sourceSplitState) throws Exception {
        Iterator<SourceRecord> it = sourceRecords.iterator();
        while (it.hasNext()) {
            processElement(it.next(), sourceOutput, sourceSplitState);
        }
    }

    protected void processElement(SourceRecord sourceRecord, SourceOutput<T> sourceOutput, SourceSplitState sourceSplitState) throws Exception {
        if (WatermarkEvent.isWatermarkEvent(sourceRecord)) {
            LOG.trace("Process WatermarkEvent: {}; splitState = {}", sourceRecord, sourceSplitState);
            Offset watermark = getWatermark(sourceRecord);
            if (WatermarkEvent.isHighWatermarkEvent(sourceRecord) && sourceSplitState.isSnapshotSplitState()) {
                LOG.trace("Set HighWatermark {} for {}", watermark, sourceSplitState);
                sourceSplitState.asSnapshotSplitState().setHighWatermark(watermark);
                return;
            }
            return;
        }
        if (SourceRecordUtils.isSchemaChangeEvent(sourceRecord) && sourceSplitState.isStreamSplitState()) {
            LOG.trace("Process SchemaChangeEvent: {}; splitState = {}", sourceRecord, sourceSplitState);
            Iterator<TableChanges.TableChange> it = TABLE_CHANGE_SERIALIZER.deserialize(SourceRecordUtils.getHistoryRecord(sourceRecord).document().getArray(HistoryRecord.Fields.TABLE_CHANGES), true).iterator();
            while (it.hasNext()) {
                TableChanges.TableChange next = it.next();
                sourceSplitState.asStreamSplitState().recordSchema(next.getId(), next);
            }
            if (this.includeSchemaChanges) {
                emitElement(sourceRecord, sourceOutput);
                return;
            }
            return;
        }
        if (SourceRecordUtils.isDataChangeRecord(sourceRecord)) {
            LOG.trace("Process DataChangeRecord: {}; splitState = {}", sourceRecord, sourceSplitState);
            updateStreamSplitState(sourceSplitState, sourceRecord);
            reportMetrics(sourceRecord);
            emitElement(sourceRecord, sourceOutput);
            return;
        }
        if (SourceRecordUtils.isHeartbeatEvent(sourceRecord)) {
            LOG.trace("Process Heartbeat: {}; splitState = {}", sourceRecord, sourceSplitState);
            updateStreamSplitState(sourceSplitState, sourceRecord);
        } else {
            LOG.info("Meet unknown element {} for splitState = {}, just skip.", sourceRecord, sourceSplitState);
            this.sourceReaderMetrics.addNumRecordsInErrors(1L);
        }
    }

    private void updateStreamSplitState(SourceSplitState sourceSplitState, SourceRecord sourceRecord) {
        if (sourceSplitState.isStreamSplitState()) {
            sourceSplitState.asStreamSplitState().setStartingOffset(getOffsetPosition(sourceRecord));
        }
    }

    private Offset getWatermark(SourceRecord sourceRecord) {
        return getOffsetPosition(sourceRecord.sourceOffset());
    }

    public Offset getOffsetPosition(SourceRecord sourceRecord) {
        return getOffsetPosition(sourceRecord.sourceOffset());
    }

    public Offset getOffsetPosition(Map<String, ?> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, ?> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
        }
        return this.offsetFactory.newOffset(hashMap);
    }

    protected void emitElement(SourceRecord sourceRecord, SourceOutput<T> sourceOutput) throws Exception {
        ((OutputCollector) this.outputCollector).output = sourceOutput;
        this.debeziumDeserializationSchema.deserialize(sourceRecord, this.outputCollector);
    }

    protected void reportMetrics(SourceRecord sourceRecord) {
        Long fetchTimestamp;
        Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(sourceRecord);
        if (messageTimestamp == null || messageTimestamp.longValue() <= 0 || (fetchTimestamp = SourceRecordUtils.getFetchTimestamp(sourceRecord)) == null) {
            return;
        }
        this.sourceReaderMetrics.recordFetchDelay(fetchTimestamp.longValue() - messageTimestamp.longValue());
    }
}
