package com.alibaba.alink.operator.stream.sink;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.exceptions.AkIllegalOperationException;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.sink.BaseSinkStreamOp;
import com.alibaba.alink.operator.stream.utils.MTableSerializeStreamOp;
import com.alibaba.alink.operator.stream.utils.TensorSerializeStreamOp;
import com.alibaba.alink.operator.stream.utils.VectorSerializeStreamOp;
import com.alibaba.alink.params.io.HasIoName;
import com.alibaba.alink.params.io.HasIoType;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@InputPorts(values = {@PortSpec(PortType.ANY)})
@OutputPorts
@NameCn("")
/* loaded from: input_file:com/alibaba/alink/operator/stream/sink/BaseSinkStreamOp.class */
public abstract class BaseSinkStreamOp<T extends BaseSinkStreamOp<T>> extends StreamOperator<T> {
    static final IOType IO_TYPE = IOType.SinkStream;
    private static final long serialVersionUID = -7359864593021986768L;

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseSinkStreamOp(String str, Params params) {
        super(params);
        getParams().set((ParamInfo<ParamInfo<IOType>>) HasIoType.IO_TYPE, (ParamInfo<IOType>) IO_TYPE).set((ParamInfo<ParamInfo<String>>) HasIoName.IO_NAME, (ParamInfo<String>) str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public T linkFrom(StreamOperator<?>... streamOperatorArr) {
        return sinkFrom(((MTableSerializeStreamOp) ((VectorSerializeStreamOp) checkAndGetFirst(streamOperatorArr).link((StreamOperator) new VectorSerializeStreamOp().setMLEnvironmentId(getMLEnvironmentId()))).link((StreamOperator) new MTableSerializeStreamOp().setMLEnvironmentId(getMLEnvironmentId()))).link((StreamOperator) new TensorSerializeStreamOp().setMLEnvironmentId(getMLEnvironmentId())));
    }

    protected abstract T sinkFrom(StreamOperator<?> streamOperator);

    @Override // com.alibaba.alink.operator.AlgoOperator
    public final Table getOutputTable() {
        throw new AkIllegalOperationException("Sink Operator has no output data.");
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public final StreamOperator<?> getSideOutput(int i) {
        throw new AkIllegalOperationException("Sink Operator has no side-output data.");
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public final int getSideOutputCount() {
        return 0;
    }

    public DataStream<Row> toRetractStream(Table table) {
        return MLEnvironmentFactory.get(getMLEnvironmentId()).getStreamTableEnvironment().toRetractStream(table, Row.class).flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Row>() { // from class: com.alibaba.alink.operator.stream.sink.BaseSinkStreamOp.1
            private static final long serialVersionUID = -335704052194502150L;

            public void flatMap(Tuple2<Boolean, Row> tuple2, Collector<Row> collector) {
                if (((Boolean) tuple2.f0).booleanValue()) {
                    collector.collect(tuple2.f1);
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Tuple2<Boolean, Row>) obj, (Collector<Row>) collector);
            }
        });
    }

    public Boolean isAppendStream(Table table) {
        try {
            MLEnvironmentFactory.get(getMLEnvironmentId()).getStreamTableEnvironment().toAppendStream(table, Row.class);
            return true;
        } catch (TableException e) {
            return false;
        }
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public /* bridge */ /* synthetic */ StreamOperator linkFrom(StreamOperator[] streamOperatorArr) {
        return linkFrom((StreamOperator<?>[]) streamOperatorArr);
    }
}
