package com.alibaba.alink.operator.stream.source;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.common.io.filesystem.AkUtils;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.operator.common.modelstream.ModelStreamFileScanner;
import com.alibaba.alink.operator.common.modelstream.ModelStreamUtils;
import com.alibaba.alink.operator.stream.utils.DataStreamConversionUtil;
import com.alibaba.alink.params.io.ModelStreamFileSourceParams;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@InputPorts
@OutputPorts(values = {@PortSpec(PortType.MODEL_STREAM)})
@IoOpAnnotation(name = "modelstream_file", ioType = IOType.SourceStream)
@NameCn("流式模型流输入")
@NameEn("Model Stream File Source")
/* loaded from: input_file:com/alibaba/alink/operator/stream/source/ModelStreamFileSourceStreamOp.class */
public final class ModelStreamFileSourceStreamOp extends BaseSourceStreamOp<ModelStreamFileSourceStreamOp> implements ModelStreamFileSourceParams<ModelStreamFileSourceStreamOp> {
    private static final long serialVersionUID = 6926655915805468249L;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/alink/operator/stream/source/ModelStreamFileSourceStreamOp$FileModelStreamSourceInputSplit.class */
    public static class FileModelStreamSourceInputSplit implements InputSplit {
        private FileModelStreamSourceInputSplit() {
        }

        public int getSplitNumber() {
            return 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/alink/operator/stream/source/ModelStreamFileSourceStreamOp$FileModelStreamSourceMonitorInputFormat.class */
    public static class FileModelStreamSourceMonitorInputFormat implements InputFormat<Timestamp, FileModelStreamSourceInputSplit> {
        private final FilePath filePath;
        private final Timestamp startTime;
        private final long scanInterval;
        private transient ModelStreamFileScanner fileScanner;
        private transient Iterator<Timestamp> streamSourceIterator;

        public FileModelStreamSourceMonitorInputFormat(FilePath filePath, Timestamp timestamp, long j) {
            this.filePath = filePath;
            this.startTime = timestamp;
            this.scanInterval = j;
        }

        public void configure(Configuration configuration) {
        }

        public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
            return null;
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public FileModelStreamSourceInputSplit[] m707createInputSplits(int i) throws IOException {
            return new FileModelStreamSourceInputSplit[]{new FileModelStreamSourceInputSplit()};
        }

        public InputSplitAssigner getInputSplitAssigner(final FileModelStreamSourceInputSplit[] fileModelStreamSourceInputSplitArr) {
            return new InputSplitAssigner() { // from class: com.alibaba.alink.operator.stream.source.ModelStreamFileSourceStreamOp.FileModelStreamSourceMonitorInputFormat.1
                public InputSplit getNextInputSplit(String str, int i) {
                    return fileModelStreamSourceInputSplitArr[0];
                }

                public void returnInputSplit(List<InputSplit> list, int i) {
                    list.add(fileModelStreamSourceInputSplitArr[0]);
                }
            };
        }

        public void open(FileModelStreamSourceInputSplit fileModelStreamSourceInputSplit) throws IOException {
            this.fileScanner = new ModelStreamFileScanner(1, 2);
            this.fileScanner.open();
            this.streamSourceIterator = this.fileScanner.scanToFile(new ModelStreamFileScanner.ScanTask(this.filePath, this.startTime), Time.of(this.scanInterval, TimeUnit.MILLISECONDS));
        }

        public boolean reachedEnd() throws IOException {
            return !this.streamSourceIterator.hasNext();
        }

        public Timestamp nextRecord(Timestamp timestamp) throws IOException {
            return this.streamSourceIterator.next();
        }

        public void close() throws IOException {
            if (this.fileScanner != null) {
                this.fileScanner.close();
            }
        }
    }

    public ModelStreamFileSourceStreamOp() {
        this(new Params());
    }

    public ModelStreamFileSourceStreamOp(Params params) {
        super(AnnotationUtils.annotatedName(ModelStreamFileSourceStreamOp.class), params);
    }

    @Override // com.alibaba.alink.operator.stream.source.BaseSourceStreamOp
    protected Table initializeDataSource() {
        final FilePath filePath = getFilePath();
        return DataStreamConversionUtil.toTable(getMLEnvironmentId(), (DataStream<Row>) createModelFileIdSource(getMLEnvironmentId(), filePath, ModelStreamUtils.createStartTime(getStartTime()), ModelStreamUtils.createScanIntervalMillis(getScanInterval().intValue())).rebalance().flatMap(new RichFlatMapFunction<Timestamp, Tuple3<Timestamp, Long, FilePath>>() { // from class: com.alibaba.alink.operator.stream.source.ModelStreamFileSourceStreamOp.2
            public void flatMap(Timestamp timestamp, Collector<Tuple3<Timestamp, Long, FilePath>> collector) {
                Tuple3<Timestamp, Long, FilePath> descModel = ModelStreamUtils.descModel(filePath, timestamp);
                try {
                    Iterator<FilePath> it = ModelStreamUtils.listModelFiles((FilePath) descModel.f2).iterator();
                    while (it.hasNext()) {
                        collector.collect(Tuple3.of(descModel.f0, descModel.f1, it.next()));
                    }
                } catch (IOException e) {
                    throw new AkUnclassifiedErrorException("Error. ", e);
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Timestamp) obj, (Collector<Tuple3<Timestamp, Long, FilePath>>) collector);
            }
        }).rebalance().flatMap(new RichFlatMapFunction<Tuple3<Timestamp, Long, FilePath>, Row>() { // from class: com.alibaba.alink.operator.stream.source.ModelStreamFileSourceStreamOp.1
            public void flatMap(Tuple3<Timestamp, Long, FilePath> tuple3, Collector<Row> collector) throws Exception {
                Iterator it = ((List) AkUtils.readFromPath((FilePath) tuple3.f2).f1).iterator();
                while (it.hasNext()) {
                    collector.collect(ModelStreamUtils.genRowWithIdentifierInternal((Row) it.next(), (Timestamp) tuple3.f0, (Long) tuple3.f1));
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Tuple3<Timestamp, Long, FilePath>) obj, (Collector<Row>) collector);
            }
        }), ModelStreamUtils.createSchemaWithModelStreamPrefix(ModelStreamUtils.createSchemaFromFilePath(filePath, getSchemaStr())));
    }

    private static DataStream<Timestamp> createModelFileIdSource(Long l, FilePath filePath, Timestamp timestamp, long j) {
        return MLEnvironmentFactory.get(l).getStreamExecutionEnvironment().createInput(new FileModelStreamSourceMonitorInputFormat(filePath, timestamp, j)).setParallelism(1);
    }
}
