package com.alibaba.alink.operator.stream.source;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.common.io.kafka.plugin.KafkaClassLoaderFactory;
import com.alibaba.alink.operator.stream.utils.DataStreamConversionUtil;
import com.alibaba.alink.params.io.KafkaSourceParams;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

@IoOpAnnotation(name = KafkaClassLoaderFactory.KAFKA_NAME, ioType = IOType.SourceStream)
@NameCn("流式Kafka输入")
@NameEn("Kafka Source")
/* loaded from: input_file:com/alibaba/alink/operator/stream/source/KafkaSourceStreamOp.class */
public class KafkaSourceStreamOp extends BaseSourceStreamOp<KafkaSourceStreamOp> implements KafkaSourceParams<KafkaSourceStreamOp> {
    private final KafkaClassLoaderFactory factory;

    public KafkaSourceStreamOp() {
        this(new Params());
    }

    public KafkaSourceStreamOp(Params params) {
        super(AnnotationUtils.annotatedName(KafkaSourceStreamOp.class), params);
        this.factory = new KafkaClassLoaderFactory("0.11");
    }

    @Override // com.alibaba.alink.operator.stream.source.BaseSourceStreamOp
    protected Table initializeDataSource() {
        Tuple2<RichParallelSourceFunction<Row>, TableSchema> createKafkaSourceFunction = KafkaClassLoaderFactory.create(this.factory).createKafkaSourceFunction(getParams());
        return DataStreamConversionUtil.toTable(getMLEnvironmentId(), (DataStream<Row>) MLEnvironmentFactory.get(getMLEnvironmentId()).getStreamExecutionEnvironment().addSource(new RichParallelSourceFunctionWithClassLoader(this.factory, (RichParallelSourceFunction) createKafkaSourceFunction.f0)), (TableSchema) createKafkaSourceFunction.f1);
    }
}
