package com.alibaba.alink.common.io.catalog;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.operator.stream.utils.DataStreamConversionUtil;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;

/* loaded from: input_file:com/alibaba/alink/common/io/catalog/SourceSinkFunctionCatalog.class */
public abstract class SourceSinkFunctionCatalog extends BaseCatalog {
    public SourceSinkFunctionCatalog(Params params) {
        super(params);
    }

    @Override // com.alibaba.alink.common.io.catalog.BaseCatalog
    public Table sourceStream(ObjectPath objectPath, Params params, Long l) {
        try {
            TableSchema schema = getTable(objectPath).getSchema();
            return DataStreamConversionUtil.toTable(l, (DataStream<Row>) MLEnvironmentFactory.get(l).getStreamExecutionEnvironment().addSource(createSourceFunction(objectPath, schema, getParams().merge(params)), new RowTypeInfo(schema.getFieldTypes())), schema);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.alibaba.alink.common.io.catalog.BaseCatalog
    public void sinkStream(ObjectPath objectPath, Table table, Params params, Long l) {
        MLEnvironmentFactory.get(l).getStreamTableEnvironment().toAppendStream(table, new RowTypeInfo(table.getSchema().getFieldTypes())).addSink(createSinkFunction(objectPath, table.getSchema(), getParams().merge(params)));
    }

    @Override // com.alibaba.alink.common.io.catalog.BaseCatalog
    public Table sourceBatch(ObjectPath objectPath, Params params, Long l) {
        throw new UnsupportedOperationException();
    }

    @Override // com.alibaba.alink.common.io.catalog.BaseCatalog
    public void sinkBatch(ObjectPath objectPath, Table table, Params params, Long l) {
        throw new UnsupportedOperationException();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract RichSinkFunction<Row> createSinkFunction(ObjectPath objectPath, TableSchema tableSchema, Params params);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract RichParallelSourceFunction<Row> createSourceFunction(ObjectPath objectPath, TableSchema tableSchema, Params params) throws Exception;
}
