package org.apache.shardingsphere.data.pipeline.core.importer;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.class */
public final class DefaultImporter extends AbstractLifecycleExecutor implements Importer {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultImporter.class);
    private static final DataRecordMerger MERGER = new DataRecordMerger();
    private final ImporterConfiguration importerConfig;
    private final PipelineDataSourceManager dataSourceManager;
    private final PipelineSQLBuilder pipelineSqlBuilder;
    private final PipelineChannel channel;
    private final PipelineJobProgressListener jobProgressListener;
    private final JobRateLimitAlgorithm rateLimitAlgorithm;

    public DefaultImporter(ImporterConfiguration importerConfiguration, PipelineDataSourceManager pipelineDataSourceManager, PipelineChannel pipelineChannel, PipelineJobProgressListener pipelineJobProgressListener) {
        this.importerConfig = importerConfiguration;
        this.rateLimitAlgorithm = importerConfiguration.getRateLimitAlgorithm();
        this.dataSourceManager = pipelineDataSourceManager;
        this.channel = pipelineChannel;
        this.pipelineSqlBuilder = PipelineSQLBuilderFactory.getInstance(importerConfiguration.getDataSourceConfig().getDatabaseType().getType());
        this.jobProgressListener = pipelineJobProgressListener;
    }

    protected void doStart() {
        write();
    }

    private void write() {
        log.info("importer write");
        int i = 1;
        int i2 = 0;
        boolean z = false;
        int batchSize = this.importerConfig.getBatchSize() * 2;
        while (true) {
            if (!isRunning()) {
                break;
            }
            List<Record> fetchRecords = this.channel.fetchRecords(batchSize, 3);
            if (null != fetchRecords && !fetchRecords.isEmpty()) {
                i++;
                i2 += fetchRecords.size();
                flush(this.dataSourceManager.getDataSource(this.importerConfig.getDataSourceConfig()), fetchRecords);
                this.channel.ack(fetchRecords);
                this.jobProgressListener.onProgressUpdated();
                if (0 == i % 50) {
                    log.info("importer write, round={}, rowCount={}", Integer.valueOf(i), Integer.valueOf(i2));
                }
                if (FinishedRecord.class.equals(fetchRecords.get(fetchRecords.size() - 1).getClass())) {
                    log.info("write, get FinishedRecord, break");
                    z = true;
                    break;
                }
            }
        }
        log.info("importer write done, rowCount={}, finishedByBreak={}", Integer.valueOf(i2), Boolean.valueOf(z));
    }

    private void flush(DataSource dataSource, List<Record> list) {
        MERGER.group((List) list.stream().filter(record -> {
            return record instanceof DataRecord;
        }).map(record2 -> {
            return (DataRecord) record2;
        }).collect(Collectors.toList())).forEach(groupedDataRecord -> {
            flushInternal(dataSource, groupedDataRecord.getDeleteDataRecords());
            flushInternal(dataSource, groupedDataRecord.getInsertDataRecords());
            flushInternal(dataSource, groupedDataRecord.getUpdateDataRecords());
        });
    }

    private void flushInternal(DataSource dataSource, List<DataRecord> list) {
        if (null == list || list.isEmpty()) {
            return;
        }
        boolean tryFlush = tryFlush(dataSource, list);
        if (isRunning() && !tryFlush) {
            throw new PipelineJobExecutionException("write failed.");
        }
    }

    private boolean tryFlush(DataSource dataSource, List<DataRecord> list) {
        for (int i = 0; isRunning() && i <= this.importerConfig.getRetryTimes(); i++) {
            try {
                doFlush(dataSource, list);
                return true;
            } catch (SQLException e) {
                log.error("flush failed {}/{} times.", new Object[]{Integer.valueOf(i), Integer.valueOf(this.importerConfig.getRetryTimes()), e});
                ThreadUtil.sleep(Math.min(300000L, 1000 << i));
            }
        }
        return false;
    }

    private void doFlush(DataSource dataSource, List<DataRecord> list) throws SQLException {
        Connection connection = dataSource.getConnection();
        Throwable th = null;
        try {
            try {
                connection.setAutoCommit(false);
                String type = list.get(0).getType();
                boolean z = -1;
                switch (type.hashCode()) {
                    case -2130463047:
                        if (type.equals(IngestDataChangeType.INSERT)) {
                            z = false;
                            break;
                        }
                        break;
                    case -1785516855:
                        if (type.equals(IngestDataChangeType.UPDATE)) {
                            z = true;
                            break;
                        }
                        break;
                    case 2012838315:
                        if (type.equals(IngestDataChangeType.DELETE)) {
                            z = 2;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (null != this.rateLimitAlgorithm) {
                            this.rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
                        }
                        executeBatchInsert(connection, list);
                        break;
                    case true:
                        if (null != this.rateLimitAlgorithm) {
                            this.rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 1);
                        }
                        executeUpdate(connection, list);
                        break;
                    case true:
                        if (null != this.rateLimitAlgorithm) {
                            this.rateLimitAlgorithm.intercept(JobOperationType.DELETE, 1);
                        }
                        executeBatchDelete(connection, list);
                        break;
                }
                connection.commit();
                if (connection != null) {
                    if (0 == 0) {
                        connection.close();
                        return;
                    }
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (connection != null) {
                if (th != null) {
                    try {
                        connection.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    connection.close();
                }
            }
            throw th4;
        }
    }

    private void executeBatchInsert(Connection connection, List<DataRecord> list) throws SQLException {
        DataRecord dataRecord = list.get(0);
        PreparedStatement prepareStatement = connection.prepareStatement(this.pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord, this.importerConfig.getShardingColumnsMap()));
        Throwable th = null;
        try {
            try {
                prepareStatement.setQueryTimeout(30);
                for (DataRecord dataRecord2 : list) {
                    for (int i = 0; i < dataRecord2.getColumnCount(); i++) {
                        prepareStatement.setObject(i + 1, dataRecord2.getColumn(i).getValue());
                    }
                    prepareStatement.addBatch();
                }
                prepareStatement.executeBatch();
                if (prepareStatement != null) {
                    if (0 == 0) {
                        prepareStatement.close();
                        return;
                    }
                    try {
                        prepareStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (prepareStatement != null) {
                if (th != null) {
                    try {
                        prepareStatement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    prepareStatement.close();
                }
            }
            throw th4;
        }
    }

    private String getSchemaName(String str) {
        return getImporterConfig().getSchemaName(new LogicTableName(str));
    }

    private void executeUpdate(Connection connection, List<DataRecord> list) throws SQLException {
        Iterator<DataRecord> it = list.iterator();
        while (it.hasNext()) {
            executeUpdate(connection, it.next());
        }
    }

    private void executeUpdate(Connection connection, DataRecord dataRecord) throws SQLException {
        Set shardingColumns = this.importerConfig.getShardingColumns(dataRecord.getTableName());
        if (null == shardingColumns) {
            log.error("executeUpdate, could not get shardingColumns, tableName={}, logicTableNames={}", dataRecord.getTableName(), this.importerConfig.getLogicTableNames());
        }
        List<Column> extractConditionColumns = RecordUtil.extractConditionColumns(dataRecord, shardingColumns);
        List extractUpdatedColumns = this.pipelineSqlBuilder.extractUpdatedColumns(dataRecord, this.importerConfig.getShardingColumnsMap());
        String buildUpdateSQL = this.pipelineSqlBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()), dataRecord, extractConditionColumns, this.importerConfig.getShardingColumnsMap());
        PreparedStatement prepareStatement = connection.prepareStatement(buildUpdateSQL);
        Throwable th = null;
        for (int i = 0; i < extractUpdatedColumns.size(); i++) {
            try {
                try {
                    prepareStatement.setObject(i + 1, ((Column) extractUpdatedColumns.get(i)).getValue());
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (prepareStatement != null) {
                    if (th != null) {
                        try {
                            prepareStatement.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        prepareStatement.close();
                    }
                }
                throw th3;
            }
        }
        for (int i2 = 0; i2 < extractConditionColumns.size(); i2++) {
            Column column = extractConditionColumns.get(i2);
            prepareStatement.setObject(extractUpdatedColumns.size() + i2 + 1, (column.isUniqueKey() && column.isUpdated()) ? column.getOldValue() : column.getValue());
        }
        int executeUpdate = prepareStatement.executeUpdate();
        if (1 != executeUpdate) {
            log.warn("executeUpdate failed, updateCount={}, updateSql={}, updatedColumns={}, conditionColumns={}", new Object[]{Integer.valueOf(executeUpdate), buildUpdateSQL, extractUpdatedColumns, extractConditionColumns});
        }
        if (prepareStatement != null) {
            if (0 == 0) {
                prepareStatement.close();
                return;
            }
            try {
                prepareStatement.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    private void executeBatchDelete(Connection connection, List<DataRecord> list) throws SQLException {
        DataRecord dataRecord = list.get(0);
        PreparedStatement prepareStatement = connection.prepareStatement(this.pipelineSqlBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), dataRecord, RecordUtil.extractConditionColumns(dataRecord, this.importerConfig.getShardingColumns(dataRecord.getTableName()))));
        Throwable th = null;
        try {
            try {
                prepareStatement.setQueryTimeout(30);
                for (DataRecord dataRecord2 : list) {
                    List<Column> extractConditionColumns = RecordUtil.extractConditionColumns(dataRecord2, this.importerConfig.getShardingColumns(dataRecord2.getTableName()));
                    for (int i = 0; i < extractConditionColumns.size(); i++) {
                        prepareStatement.setObject(i + 1, extractConditionColumns.get(i).getValue());
                    }
                    prepareStatement.addBatch();
                }
                prepareStatement.executeBatch();
                if (prepareStatement != null) {
                    if (0 == 0) {
                        prepareStatement.close();
                        return;
                    }
                    try {
                        prepareStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (prepareStatement != null) {
                if (th != null) {
                    try {
                        prepareStatement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    prepareStatement.close();
                }
            }
            throw th4;
        }
    }

    protected void doStop() {
    }

    @Generated
    protected ImporterConfiguration getImporterConfig() {
        return this.importerConfig;
    }
}
