package com.ververica.cdc.connectors.oracle.source.reader.fetch;

import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import com.ververica.cdc.connectors.base.utils.SourceRecordUtils;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
import com.ververica.cdc.connectors.oracle.source.utils.OracleUtils;
import com.ververica.cdc.connectors.oracle.util.ChunkUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.header.ConnectHeaders;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.oracle.OracleChangeEventSourceMetricsFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleErrorHandler;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.OracleTopicSelector;
import io.debezium.connector.oracle.SourceInfo;
import io.debezium.connector.oracle.logminer.LogMinerOracleOffsetContextLoader;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Map;
import oracle.sql.ROWID;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.class */
public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
    private static final Logger LOG = LoggerFactory.getLogger(OracleSourceFetchTaskContext.class);
    private final OracleConnection connection;
    private final OracleEventMetadataProvider metadataProvider;
    private OracleDatabaseSchema databaseSchema;
    private OracleTaskContext taskContext;
    private OracleOffsetContext offsetContext;
    private OraclePartition partition;
    private SnapshotChangeEventSourceMetrics<OraclePartition> snapshotChangeEventSourceMetrics;
    private OracleStreamingChangeEventSourceMetrics streamingChangeEventSourceMetrics;
    private TopicSelector<TableId> topicSelector;
    private JdbcSourceEventDispatcher<OraclePartition> dispatcher;
    private ChangeEventQueue<DataChangeEvent> queue;
    private OracleErrorHandler errorHandler;

    /* loaded from: input_file:com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext$OracleEventMetadataProvider.class */
    public static class OracleEventMetadataProvider implements EventMetadataProvider {
        @Override // io.debezium.pipeline.source.spi.EventMetadataProvider
        public Instant getEventTimestamp(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            Long int64;
            if (struct == null) {
                return null;
            }
            Struct struct2 = struct.getStruct("source");
            if (dataCollectionId == null || (int64 = struct2.getInt64("ts_ms")) == null) {
                return null;
            }
            return Instant.ofEpochMilli(int64.longValue());
        }

        @Override // io.debezium.pipeline.source.spi.EventMetadataProvider
        public Map<String, String> getEventSourcePosition(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            if (struct == null) {
                return null;
            }
            Struct struct2 = struct.getStruct("source");
            if (dataCollectionId == null) {
                return null;
            }
            String string = struct2.getString("scn");
            return Collect.hashMapOf("scn", string == null ? "null" : string);
        }

        @Override // io.debezium.pipeline.source.spi.EventMetadataProvider
        public String getTransactionId(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            if (struct == null) {
                return null;
            }
            Struct struct2 = struct.getStruct("source");
            if (dataCollectionId == null) {
                return null;
            }
            return struct2.getString(SourceInfo.TXID_KEY);
        }
    }

    public OracleSourceFetchTaskContext(JdbcSourceConfig jdbcSourceConfig, JdbcDataSourceDialect jdbcDataSourceDialect) {
        super(jdbcSourceConfig, jdbcDataSourceDialect);
        this.connection = OracleConnectionUtils.createOracleConnection(jdbcSourceConfig.getDbzConfiguration());
        this.metadataProvider = new OracleEventMetadataProvider();
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask.Context
    public void configure(SourceSplitBase sourceSplitBase) {
        OracleConnectorConfig dbzConnectorConfig = getDbzConnectorConfig();
        this.topicSelector = OracleTopicSelector.defaultSelector(dbzConnectorConfig);
        EmbeddedFlinkDatabaseHistory.registerHistory(this.sourceConfig.getDbzConfiguration().getString("database.history.instance.name"), sourceSplitBase.getTableSchemas().values());
        this.databaseSchema = OracleUtils.createOracleDatabaseSchema(dbzConnectorConfig, this.connection);
        this.offsetContext = loadStartingOffsetState(new LogMinerOracleOffsetContextLoader(dbzConnectorConfig), sourceSplitBase);
        this.partition = new OraclePartition(dbzConnectorConfig.getLogicalName());
        validateAndLoadDatabaseHistory(this.offsetContext, this.databaseSchema);
        this.taskContext = new OracleTaskContext(dbzConnectorConfig, this.databaseSchema);
        this.queue = new ChangeEventQueue.Builder().pollInterval(dbzConnectorConfig.getPollInterval()).maxBatchSize(dbzConnectorConfig.getMaxBatchSize()).maxQueueSize(sourceSplitBase.isSnapshotSplit() ? getSourceConfig().getSplitSize() : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize()).maxQueueSizeInBytes(dbzConnectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> {
            return this.taskContext.configureLoggingContext("oracle-cdc-connector-task");
        }).build();
        this.dispatcher = new JdbcSourceEventDispatcher<>(dbzConnectorConfig, this.topicSelector, this.databaseSchema, this.queue, dbzConnectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster);
        OracleChangeEventSourceMetricsFactory oracleChangeEventSourceMetricsFactory = new OracleChangeEventSourceMetricsFactory(new OracleStreamingChangeEventSourceMetrics(this.taskContext, this.queue, this.metadataProvider, dbzConnectorConfig));
        this.snapshotChangeEventSourceMetrics = oracleChangeEventSourceMetricsFactory.getSnapshotMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.streamingChangeEventSourceMetrics = (OracleStreamingChangeEventSourceMetrics) oracleChangeEventSourceMetricsFactory.getStreamingMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.errorHandler = new OracleErrorHandler(dbzConnectorConfig, this.queue);
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext, com.ververica.cdc.connectors.base.source.reader.external.FetchTask.Context
    public OracleSourceConfig getSourceConfig() {
        return (OracleSourceConfig) this.sourceConfig;
    }

    public OracleConnection getConnection() {
        return this.connection;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public OracleConnectorConfig getDbzConnectorConfig() {
        return (OracleConnectorConfig) super.getDbzConnectorConfig();
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public OracleOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    public SnapshotChangeEventSourceMetrics<OraclePartition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public OracleStreamingChangeEventSourceMetrics getStreamingChangeEventSourceMetrics() {
        return this.streamingChangeEventSourceMetrics;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public OracleDatabaseSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public RowType getSplitType(Table table) {
        return ChunkUtils.getSplitType(ChunkUtils.getChunkKeyColumn(table, getSourceConfig().getChunkKeyColumn()));
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext, com.ververica.cdc.connectors.base.source.reader.external.FetchTask.Context
    public boolean isRecordBetween(SourceRecord sourceRecord, Object[] objArr, Object[] objArr2) {
        RowType splitType = getSplitType(getDatabaseSchema().tableFor(SourceRecordUtils.getTableId(sourceRecord)));
        if (!splitType.getFieldNames().contains(ROWID.class.getSimpleName())) {
            return SourceRecordUtils.splitKeyRangeContains(SourceRecordUtils.getSplitKey(splitType, sourceRecord, getSchemaNameAdjuster()), objArr, objArr2);
        }
        ROWID rowid = null;
        try {
            rowid = new ROWID(((ConnectHeaders) sourceRecord.headers()).iterator().next().value().toString());
        } catch (SQLException e) {
            LOG.error("{} can not convert to RowId", sourceRecord);
        }
        return SourceRecordUtils.splitKeyRangeContains(new ROWID[]{rowid}, objArr, objArr2);
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public JdbcSourceEventDispatcher<OraclePartition> getDispatcher() {
        return this.dispatcher;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask.Context
    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public OraclePartition getPartition() {
        return this.partition;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask.Context
    public Tables.TableFilter getTableFilter() {
        return getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask.Context
    public Offset getStreamOffset(SourceRecord sourceRecord) {
        return OracleUtils.getRedoLogPosition(sourceRecord);
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask.Context
    public void close() throws Exception {
        this.connection.close();
    }

    private OracleOffsetContext loadStartingOffsetState(OffsetContext.Loader<OracleOffsetContext> loader, SourceSplitBase sourceSplitBase) {
        return loader.load((sourceSplitBase.isSnapshotSplit() ? RedoLogOffset.INITIAL_OFFSET : sourceSplitBase.asStreamSplit().getStartingOffset()).getOffset());
    }

    private void validateAndLoadDatabaseHistory(OracleOffsetContext oracleOffsetContext, OracleDatabaseSchema oracleDatabaseSchema) {
        oracleDatabaseSchema.initializeStorage();
        oracleDatabaseSchema.recover(Offsets.of(this.partition, oracleOffsetContext));
    }
}
