package io.debezium.connector.oracle.logminer.processor.infinispan;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerQueryBuilder;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import oracle.xml.xslt.XSLConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.class */
public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor<InfinispanTransaction> implements CacheProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractInfinispanLogMinerEventProcessor.class);
    private final OracleConnection jdbcConnection;
    private final OracleStreamingChangeEventSourceMetrics metrics;
    private final OraclePartition partition;
    private final OracleOffsetContext offsetContext;
    private final EventDispatcher<OraclePartition, TableId> dispatcher;

    public AbstractInfinispanLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, EventDispatcher<OraclePartition, TableId> eventDispatcher, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext, OracleDatabaseSchema oracleDatabaseSchema, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics) {
        super(changeEventSourceContext, oracleConnectorConfig, oracleDatabaseSchema, oraclePartition, oracleOffsetContext, eventDispatcher, oracleStreamingChangeEventSourceMetrics);
        this.jdbcConnection = oracleConnection;
        this.metrics = oracleStreamingChangeEventSourceMetrics;
        this.partition = oraclePartition;
        this.offsetContext = oracleOffsetContext;
        this.dispatcher = eventDispatcher;
    }

    @Override // io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider
    public void displayCacheStatistics() {
        LOGGER.info("Overall Cache Statistics:");
        LOGGER.info("\tTransactions        : {}", Integer.valueOf(mo1546getTransactionCache().size()));
        LOGGER.info("\tRecent Transactions : {}", Integer.valueOf(getProcessedTransactionsCache().size()));
        LOGGER.info("\tSchema Changes      : {}", Integer.valueOf(getSchemaChangesCache().size()));
        LOGGER.info("\tEvents              : {}", Integer.valueOf(getEventCache().size()));
        if (getEventCache().isEmpty()) {
            return;
        }
        Iterator it = getEventCache().keySet().iterator();
        while (it.hasNext()) {
            LOGGER.debug("\t\tFound Key: {}", (String) it.next());
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected boolean isRecentlyProcessed(String str) {
        return getProcessedTransactionsCache().containsKey(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public InfinispanTransaction createTransaction(LogMinerEventRow logMinerEventRow) {
        return new InfinispanTransaction(logMinerEventRow.getTransactionId(), logMinerEventRow.getScn(), logMinerEventRow.getChangeTime(), logMinerEventRow.getUserName());
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void removeEventWithRowId(LogMinerEventRow logMinerEventRow) {
        List<String> transactionKeysWithPrefix = getTransactionKeysWithPrefix(logMinerEventRow.getTransactionId() + XSLConstants.DEFAULT_MINUS_SIGN);
        if (!transactionKeysWithPrefix.isEmpty() || !isTransactionIdWithNoSequence(logMinerEventRow.getTransactionId())) {
            for (String str : transactionKeysWithPrefix) {
                LogMinerEvent logMinerEvent = (LogMinerEvent) getEventCache().get(str);
                if (logMinerEvent != null && logMinerEvent.getRowId().equals(logMinerEventRow.getRowId())) {
                    LOGGER.trace("Undo applied for event {}.", logMinerEvent);
                    getEventCache().remove(str);
                    return;
                }
            }
            LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", logMinerEventRow, logMinerEventRow.getRowId());
            return;
        }
        String transactionIdPrefix = getTransactionIdPrefix(logMinerEventRow.getTransactionId());
        LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", logMinerEventRow.getTransactionId());
        LOGGER.debug("Checking all transactions with prefix '{}'", transactionIdPrefix);
        List<String> transactionKeysWithPrefix2 = getTransactionKeysWithPrefix(transactionIdPrefix);
        if (transactionKeysWithPrefix2.isEmpty()) {
            if (getConfig().isLobEnabled()) {
                return;
            }
            LOGGER.warn("Cannot undo change '{}' since transaction was not found.", logMinerEventRow);
            return;
        }
        for (String str2 : transactionKeysWithPrefix2) {
            LogMinerEvent logMinerEvent2 = (LogMinerEvent) getEventCache().get(str2);
            if (logMinerEvent2 != null && logMinerEvent2.getRowId().equals(logMinerEventRow.getRowId())) {
                LOGGER.debug("Undo change '{}' applied to transaction '{}'", logMinerEventRow, str2);
                getEventCache().remove(str2);
                return;
            }
        }
        LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", logMinerEventRow, logMinerEventRow.getRowId());
    }

    private List<String> getTransactionKeysWithPrefix(String str) {
        return (List) getEventCache().keySet().stream().filter(str2 -> {
            return str2.startsWith(str);
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public void processRow(OraclePartition oraclePartition, LogMinerEventRow logMinerEventRow) throws SQLException, InterruptedException {
        String transactionId = logMinerEventRow.getTransactionId();
        if (isRecentlyProcessed(transactionId)) {
            LOGGER.trace("Transaction {} has been seen by connector, skipped.", transactionId);
        } else {
            super.processRow(oraclePartition, logMinerEventRow);
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor
    public void abandonTransactions(Duration duration) throws InterruptedException {
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow logMinerEventRow) {
        return getSchemaChangesCache().containsKey(logMinerEventRow.getScn().toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public InfinispanTransaction getAndRemoveTransactionFromCache(String str) {
        InfinispanTransaction infinispanTransaction = (InfinispanTransaction) mo1546getTransactionCache().get(str);
        if (infinispanTransaction != null) {
            mo1546getTransactionCache().remove(str);
        }
        return infinispanTransaction;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public void removeTransactionAndEventsFromCache(InfinispanTransaction infinispanTransaction) {
        removeEventsWithTransaction(infinispanTransaction);
        mo1546getTransactionCache().remove(infinispanTransaction.getTransactionId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public Iterator<LogMinerEvent> getTransactionEventIterator(final InfinispanTransaction infinispanTransaction) {
        return new Iterator<LogMinerEvent>() { // from class: io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.1
            private final int count;
            private LogMinerEvent nextEvent;
            private int index = 0;

            {
                this.count = infinispanTransaction.getNumberOfEvents();
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                while (this.index < this.count) {
                    this.nextEvent = (LogMinerEvent) AbstractInfinispanLogMinerEventProcessor.this.getEventCache().get(infinispanTransaction.getEventId(this.index));
                    if (this.nextEvent != null) {
                        break;
                    }
                    AbstractInfinispanLogMinerEventProcessor.LOGGER.trace("Event {} must have been undone, skipped.", Integer.valueOf(this.index));
                    this.index++;
                }
                return this.index < this.count;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public LogMinerEvent next() {
                this.index++;
                return this.nextEvent;
            }
        };
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void finalizeTransactionCommit(String str, Scn scn) {
        getProcessedTransactionsCache().put(str, scn.toString());
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void finalizeTransactionRollback(String str, Scn scn) {
        InfinispanTransaction infinispanTransaction = (InfinispanTransaction) mo1546getTransactionCache().get(str);
        if (infinispanTransaction != null) {
            removeEventsWithTransaction(infinispanTransaction);
            mo1546getTransactionCache().remove(str);
        }
        getProcessedTransactionsCache().put(str, scn.toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public void handleSchemaChange(LogMinerEventRow logMinerEventRow) throws InterruptedException {
        super.handleSchemaChange(logMinerEventRow);
        if (logMinerEventRow.getTableName() != null) {
            getSchemaChangesCache().put(logMinerEventRow.getScn().toString(), logMinerEventRow.getTableId().identifier());
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void addToTransaction(String str, LogMinerEventRow logMinerEventRow, Supplier<LogMinerEvent> supplier) {
        if (isRecentlyProcessed(str)) {
            LOGGER.warn("Event for transaction {} skipped as transaction has been processed.", str);
            return;
        }
        InfinispanTransaction infinispanTransaction = (InfinispanTransaction) mo1546getTransactionCache().get(str);
        if (infinispanTransaction == null) {
            LOGGER.trace("Transaction {} is not in cache, creating.", str);
            infinispanTransaction = createTransaction(logMinerEventRow);
        }
        String eventId = infinispanTransaction.getEventId(infinispanTransaction.getNextEventId());
        if (!getEventCache().containsKey(eventId)) {
            LOGGER.trace("Transaction {}, adding event reference at key {}", str, eventId);
            getEventCache().put(eventId, supplier.get());
            this.metrics.calculateLagMetrics(logMinerEventRow.getChangeTime());
        }
        mo1546getTransactionCache().put(str, infinispanTransaction);
        this.metrics.setActiveTransactions(mo1546getTransactionCache().size());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public int getTransactionEventCount(InfinispanTransaction infinispanTransaction) {
        return (int) getEventCache().keySet().parallelStream().filter(str -> {
            return str.startsWith(infinispanTransaction.getTransactionId() + XSLConstants.DEFAULT_MINUS_SIGN);
        }).count();
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected PreparedStatement createQueryStatement() throws SQLException {
        return this.jdbcConnection.connection().prepareStatement(LogMinerQueryBuilder.build(getConfig(), getSchema()), 1003, 1007, 1);
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected Scn calculateNewStartScn(Scn scn, Scn scn2) throws InterruptedException {
        Scn transactionCacheMinimumScn = getTransactionCacheMinimumScn();
        if (transactionCacheMinimumScn.isNull()) {
            getProcessedTransactionsCache().clear();
            getSchemaChangesCache().clear();
        } else {
            getProcessedTransactionsCache().entrySet().removeIf(entry -> {
                return Scn.valueOf((String) entry.getValue()).compareTo(transactionCacheMinimumScn) < 0;
            });
            getSchemaChangesCache().entrySet().removeIf(entry2 -> {
                return Scn.valueOf((String) entry2.getKey()).compareTo(transactionCacheMinimumScn) < 0;
            });
        }
        if (getConfig().isLobEnabled()) {
            if (mo1546getTransactionCache().isEmpty() && !scn2.isNull()) {
                this.offsetContext.setScn(scn2);
                this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
            } else if (!transactionCacheMinimumScn.isNull()) {
                this.offsetContext.setScn(transactionCacheMinimumScn.subtract(Scn.valueOf(1)));
                this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
            }
            return this.offsetContext.getScn();
        }
        if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(scn) < 0) {
            scn = getLastProcessedScn();
        }
        this.offsetContext.setScn(scn);
        this.metrics.setOldestScn(scn);
        this.metrics.setOffsetScn(scn);
        this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
        return scn;
    }

    private void removeEventsWithTransaction(InfinispanTransaction infinispanTransaction) {
        for (int i = 0; i < infinispanTransaction.getNumberOfEvents(); i++) {
            getEventCache().remove(infinispanTransaction.getEventId(i));
        }
    }
}
