package org.apache.shardingsphere.data.pipeline.core.task;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.class */
public final class IncrementalTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(IncrementalTask.class);
    private final String taskId;
    private final ExecuteEngine incrementalDumperExecuteEngine;
    private final PipelineChannel channel;
    private final Dumper dumper;
    private final Collection<Importer> importers;
    private final IncrementalTaskProgress taskProgress;

    public IncrementalTask(int i, DumperConfiguration dumperConfiguration, ImporterConfiguration importerConfiguration, PipelineChannelCreator pipelineChannelCreator, PipelineDataSourceManager pipelineDataSourceManager, PipelineTableMetaDataLoader pipelineTableMetaDataLoader, ExecuteEngine executeEngine, PipelineJobProgressListener pipelineJobProgressListener) {
        this.incrementalDumperExecuteEngine = executeEngine;
        this.taskId = dumperConfiguration.getDataSourceName();
        IngestPosition<?> position = dumperConfiguration.getPosition();
        this.taskProgress = createIncrementalTaskProgress(position);
        this.channel = createChannel(i, pipelineChannelCreator, this.taskProgress);
        this.dumper = IncrementalDumperCreatorFactory.getInstance(dumperConfiguration.getDataSourceConfig().getDatabaseType().getType()).createIncrementalDumper(dumperConfiguration, position, this.channel, pipelineTableMetaDataLoader);
        this.importers = createImporters(i, importerConfiguration, pipelineDataSourceManager, this.channel, pipelineJobProgressListener);
    }

    private IncrementalTaskProgress createIncrementalTaskProgress(IngestPosition<?> ingestPosition) {
        IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress();
        incrementalTaskProgress.setPosition(ingestPosition);
        return incrementalTaskProgress;
    }

    protected void doStart() {
        this.taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
        Future<?> submitAll = this.incrementalDumperExecuteEngine.submitAll(this.importers, getExecuteCallback());
        this.dumper.start();
        waitForResult(submitAll);
    }

    private Collection<Importer> createImporters(int i, ImporterConfiguration importerConfiguration, PipelineDataSourceManager pipelineDataSourceManager, PipelineChannel pipelineChannel, PipelineJobProgressListener pipelineJobProgressListener) {
        LinkedList linkedList = new LinkedList();
        for (int i2 = 0; i2 < i; i2++) {
            linkedList.add(ImporterCreatorFactory.getInstance(importerConfiguration.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfiguration, pipelineDataSourceManager, pipelineChannel, pipelineJobProgressListener));
        }
        return linkedList;
    }

    private PipelineChannel createChannel(int i, PipelineChannelCreator pipelineChannelCreator, IncrementalTaskProgress incrementalTaskProgress) {
        return pipelineChannelCreator.createPipelineChannel(i, list -> {
            Record record = (Record) list.get(list.size() - 1);
            if (!(record.getPosition() instanceof PlaceholderPosition)) {
                incrementalTaskProgress.setPosition(record.getPosition());
                incrementalTaskProgress.getIncrementalTaskDelay().setLastEventTimestamps(record.getCommitTime());
            }
            incrementalTaskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
        });
    }

    private ExecuteCallback getExecuteCallback() {
        return new ExecuteCallback() { // from class: org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask.1
            @Override // org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback
            public void onSuccess() {
                IncrementalTask.log.info("importer onSuccess, taskId={}", IncrementalTask.this.taskId);
            }

            @Override // org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback
            public void onFailure(Throwable th) {
                IncrementalTask.log.error("importer onFailure, taskId={}", IncrementalTask.this.taskId, th);
                IncrementalTask.this.stop();
            }
        };
    }

    private void waitForResult(Future<?> future) {
        try {
            future.get();
        } catch (InterruptedException e) {
        } catch (ExecutionException e2) {
            throw new PipelineJobExecutionException(String.format("Task %s execute failed ", this.taskId), e2.getCause());
        }
    }

    protected void doStop() {
        this.dumper.stop();
        Iterator<Importer> it = this.importers.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.channel.close();
    }

    @Generated
    public String toString() {
        return "IncrementalTask(taskId=" + getTaskId() + ")";
    }

    @Override // org.apache.shardingsphere.data.pipeline.core.task.PipelineTask
    @Generated
    public String getTaskId() {
        return this.taskId;
    }

    @Override // org.apache.shardingsphere.data.pipeline.core.task.PipelineTask
    @Generated
    /* renamed from: getTaskProgress, reason: merged with bridge method [inline-methods] */
    public IncrementalTaskProgress mo50getTaskProgress() {
        return this.taskProgress;
    }
}
