package com.alibaba.alink.operator.stream.source;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.filesystem.AkUtils;
import com.alibaba.alink.common.io.filesystem.AkUtils2;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.parquet.ParquetClassLoaderFactory;
import com.alibaba.alink.common.io.parquet.ParquetReaderFactory;
import com.alibaba.alink.common.io.plugin.wrapper.RichInputFormatGenericWithClassLoader;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.utils.DataStreamConversionUtil;
import com.alibaba.alink.params.io.ParquetSourceParams;
import java.io.IOException;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@NameCn("parquet文件读入")
@NameEn("Parquet Source")
/* loaded from: input_file:com/alibaba/alink/operator/stream/source/ParquetSourceStreamOp.class */
public class ParquetSourceStreamOp extends BaseSourceStreamOp<ParquetSourceStreamOp> implements ParquetSourceParams<ParquetSourceStreamOp> {
    private final ParquetClassLoaderFactory factory;

    public ParquetSourceStreamOp() {
        this(new Params());
    }

    public ParquetSourceStreamOp(Params params) {
        super(AnnotationUtils.annotatedName(ParquetSourceStreamOp.class), params);
        this.factory = new ParquetClassLoaderFactory("0.11.0");
    }

    @Override // com.alibaba.alink.operator.stream.source.BaseSourceStreamOp
    protected Table initializeDataSource() {
        if (getPartitions() == null) {
            Tuple2<RichInputFormat<Row, FileInputSplit>, TableSchema> createParquetSourceFunction = this.factory.createParquetSourceFactory().createParquetSourceFunction(getParams());
            return DataStreamConversionUtil.toTable(getMLEnvironmentId(), (DataStream<Row>) MLEnvironmentFactory.get(getMLEnvironmentId()).getStreamExecutionEnvironment().createInput(new RichInputFormatGenericWithClassLoader(this.factory, (RichInputFormat) createParquetSourceFunction.f0), new RowTypeInfo(((TableSchema) createParquetSourceFunction.f1).getFieldTypes())), (TableSchema) createParquetSourceFunction.f1);
        }
        final FilePath filePath = getFilePath();
        try {
            StreamOperator<?> selectPartitionStreamOp = AkUtils2.selectPartitionStreamOp(getMLEnvironmentId(), filePath, getPartitions());
            final String[] colNames = selectPartitionStreamOp.getColNames();
            final ParquetClassLoaderFactory parquetClassLoaderFactory = this.factory;
            return DataStreamConversionUtil.toTable(getMLEnvironmentId(), (DataStream<Row>) selectPartitionStreamOp.getDataStream().rebalance().flatMap(new FlatMapFunction<Row, Row>() { // from class: com.alibaba.alink.operator.stream.source.ParquetSourceStreamOp.1
                public void flatMap(Row row, final Collector<Row> collector) throws Exception {
                    Path path = filePath.getPath();
                    for (int i = 0; i < row.getArity(); i++) {
                        path = new Path(path, String.format("%s=%s", colNames[i], String.valueOf(row.getField(i))));
                    }
                    AkUtils.getFromFolderForEach(new FilePath(path, filePath.getFileSystem()), new AkUtils.FileProcFunction<FilePath, Boolean>() { // from class: com.alibaba.alink.operator.stream.source.ParquetSourceStreamOp.1.1
                        @Override // com.alibaba.alink.common.io.filesystem.AkUtils.FileProcFunction
                        public Boolean apply(FilePath filePath2) throws IOException {
                            ParquetReaderFactory createParquetReaderFactory = parquetClassLoaderFactory.createParquetReaderFactory();
                            createParquetReaderFactory.open(filePath2);
                            while (!createParquetReaderFactory.reachedEnd()) {
                                collector.collect(createParquetReaderFactory.nextRecord());
                            }
                            createParquetReaderFactory.close();
                            return true;
                        }
                    });
                }

                public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                    flatMap((Row) obj, (Collector<Row>) collector);
                }
            }), parquetClassLoaderFactory.createParquetReaderFactory().getTableSchemaFromParquetFile(filePath));
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }
}
