package com.alibaba.alink.operator.stream.source;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.common.io.plugin.wrapper.RichInputFormatGenericWithClassLoader;
import com.alibaba.alink.common.io.xls.XlsReaderClassLoader;
import com.alibaba.alink.operator.stream.utils.DataStreamConversionUtil;
import com.alibaba.alink.params.io.XlsSourceParams;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

@IoOpAnnotation(name = "xls_source", ioType = IOType.SourceStream)
@NameCn("Xls和Xlsx表格读入")
@NameEn("Xls and Xlsx File Source")
/* loaded from: input_file:com/alibaba/alink/operator/stream/source/XlsSourceStreamOp.class */
public class XlsSourceStreamOp extends BaseSourceStreamOp<XlsSourceStreamOp> implements XlsSourceParams<XlsSourceStreamOp> {
    private final XlsReaderClassLoader factory;

    public XlsSourceStreamOp() {
        this(new Params());
    }

    public XlsSourceStreamOp(Params params) {
        super(AnnotationUtils.annotatedName(XlsSourceStreamOp.class), params);
        this.factory = new XlsReaderClassLoader("0.11");
    }

    @Override // com.alibaba.alink.operator.stream.source.BaseSourceStreamOp
    protected Table initializeDataSource() {
        Tuple2<RichInputFormat<Row, FileInputSplit>, TableSchema> createInputFormat = XlsReaderClassLoader.create(this.factory).createInputFormat(getParams());
        return DataStreamConversionUtil.toTable(getMLEnvironmentId(), (DataStream<Row>) MLEnvironmentFactory.get(getMLEnvironmentId()).getStreamExecutionEnvironment().createInput(new RichInputFormatGenericWithClassLoader(this.factory, (RichInputFormat) createInputFormat.f0), new RowTypeInfo(((TableSchema) createInputFormat.f1).getFieldTypes())).name("xls-file-source").rebalance(), (TableSchema) createInputFormat.f1);
    }
}
