package com.alibaba.alink.common.io.catalog;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.utils.DataSetConversionUtil;
import com.alibaba.alink.operator.stream.utils.DataStreamConversionUtil;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;

/* loaded from: input_file:com/alibaba/alink/common/io/catalog/InputOutputFormatCatalog.class */
public abstract class InputOutputFormatCatalog extends BaseCatalog {
    public static final ParamInfo<Integer> PARALLELISM = ParamInfoFactory.createParamInfo("parallelism", Integer.class).build();

    public InputOutputFormatCatalog(Params params) {
        super(params);
    }

    @Override // com.alibaba.alink.common.io.catalog.BaseCatalog
    public Table sourceStream(ObjectPath objectPath, Params params, Long l) {
        int parallelism = MLEnvironmentFactory.get(l).getStreamExecutionEnvironment().getParallelism();
        try {
            TableSchema schema = getTable(objectPath).getSchema();
            return DataStreamConversionUtil.toTable(l, (DataStream<Row>) MLEnvironmentFactory.get(l).getStreamExecutionEnvironment().createInput(createInputFormat(objectPath, schema, getParams().merge(params).set((ParamInfo<ParamInfo<Integer>>) PARALLELISM, (ParamInfo<Integer>) Integer.valueOf(parallelism))), new RowTypeInfo(schema.getFieldTypes())), schema);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.alibaba.alink.common.io.catalog.BaseCatalog
    public void sinkStream(ObjectPath objectPath, Table table, Params params, Long l) {
        MLEnvironmentFactory.get(l).getStreamTableEnvironment().toAppendStream(table, new RowTypeInfo(table.getSchema().getFieldTypes())).writeUsingOutputFormat(createOutputFormat(objectPath, table.getSchema(), getParams().merge(params)));
    }

    @Override // com.alibaba.alink.common.io.catalog.BaseCatalog
    public Table sourceBatch(ObjectPath objectPath, Params params, Long l) {
        int parallelism = MLEnvironmentFactory.get(l).getStreamExecutionEnvironment().getParallelism();
        try {
            TableSchema schema = getTable(objectPath).getSchema();
            return DataSetConversionUtil.toTable(l, (DataSet<Row>) MLEnvironmentFactory.get(l).getExecutionEnvironment().createInput(createInputFormat(objectPath, schema, getParams().merge(params).set((ParamInfo<ParamInfo<Integer>>) PARALLELISM, (ParamInfo<Integer>) Integer.valueOf(parallelism))), new RowTypeInfo(schema.getFieldTypes())), schema);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.alibaba.alink.common.io.catalog.BaseCatalog
    public void sinkBatch(ObjectPath objectPath, Table table, Params params, Long l) {
        ((BatchOperator) BatchOperator.fromTable(table).setMLEnvironmentId(l)).getDataSet().output(createOutputFormat(objectPath, table.getSchema(), getParams().merge(params)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract RichInputFormat<Row, InputSplit> createInputFormat(ObjectPath objectPath, TableSchema tableSchema, Params params) throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract OutputFormat<Row> createOutputFormat(ObjectPath objectPath, TableSchema tableSchema, Params params);
}
