package com.alibaba.alink.operator.stream.utils;

import com.alibaba.alink.common.annotation.DescCn;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.stream.StreamOperator;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@InputPorts(values = {@PortSpec(PortType.ANY)})
@OutputPorts
@IoOpAnnotation(name = "print", ioType = IOType.SinkStream)
@NameCn("流式数据打印")
@NameEn("Print")
/* loaded from: input_file:com/alibaba/alink/operator/stream/utils/PrintStreamOp.class */
public class PrintStreamOp extends StreamOperator<PrintStreamOp> {

    @DescCn("输出的刷新间隔")
    @NameCn("刷新间隔")
    public static final ParamInfo<Integer> REFRESH_INTERVAL = ParamInfoFactory.createParamInfo("refreshInterval", Integer.class).setDescription("refresh interval").setHasDefaultValue(-1).build();

    @DescCn("每个窗口内的最大输出条数")
    @NameCn("每个窗口内的最大输出条数")
    public static final ParamInfo<Integer> MAX_LIMIT = ParamInfoFactory.createParamInfo("maxLimit", Integer.class).setDescription("max limit").setHasDefaultValue(100).build();
    private static final long serialVersionUID = -7482957550550215050L;

    /* loaded from: input_file:com/alibaba/alink/operator/stream/utils/PrintStreamOp$StreamPrintListRowSinkFunction.class */
    public static class StreamPrintListRowSinkFunction extends RichSinkFunction<List<Row>> {
        private static final long serialVersionUID = 1;
        private transient PrintStream stream;

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.stream = System.err;
        }

        public void invoke(List<Row> list) {
            Iterator<Row> it = list.iterator();
            while (it.hasNext()) {
                this.stream.println(TableUtil.formatRows(it.next()));
            }
        }

        public void close() {
            this.stream = null;
        }

        public String toString() {
            return "Print to " + this.stream.toString();
        }
    }

    /* loaded from: input_file:com/alibaba/alink/operator/stream/utils/PrintStreamOp$StreamPrintSinkFunction.class */
    public static class StreamPrintSinkFunction extends RichSinkFunction<Row> {
        private static final long serialVersionUID = 1;
        private transient PrintStream stream;

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.stream = System.err;
        }

        public void invoke(Row row) {
            this.stream.println(TableUtil.formatRows(row));
        }

        public void close() {
            this.stream = null;
        }

        public String toString() {
            return "Print to " + this.stream.toString();
        }
    }

    public PrintStreamOp() {
        this(null);
    }

    public PrintStreamOp(Params params) {
        super(params);
    }

    public static void setStreamPrintStream(PrintStream printStream) {
        System.setErr(printStream);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public PrintStreamOp linkFrom(StreamOperator<?>... streamOperatorArr) {
        StreamOperator<?> checkAndGetFirst = checkAndGetFirst(streamOperatorArr);
        try {
            System.err.println(TableUtil.formatTitle(checkAndGetFirst.getColNames()));
            int intValue = ((Integer) getParams().get(REFRESH_INTERVAL)).intValue();
            if (intValue <= 0) {
                DataStreamConversionUtil.fromTable(getMLEnvironmentId(), checkAndGetFirst.getOutputTable()).addSink(new StreamPrintSinkFunction());
            } else {
                final int intValue2 = ((Integer) getParams().get(MAX_LIMIT)).intValue();
                DataStreamConversionUtil.fromTable(getMLEnvironmentId(), checkAndGetFirst.getOutputTable()).timeWindowAll(Time.of(intValue, TimeUnit.SECONDS)).apply(new AllWindowFunction<Row, List<Row>, TimeWindow>() { // from class: com.alibaba.alink.operator.stream.utils.PrintStreamOp.1
                    private static final long serialVersionUID = -5002192700679782400L;

                    public void apply(TimeWindow timeWindow, Iterable<Row> iterable, Collector<List<Row>> collector) {
                        ArrayList arrayList = new ArrayList();
                        for (Row row : iterable) {
                            if (arrayList.size() >= intValue2) {
                                break;
                            } else {
                                arrayList.add(row);
                            }
                        }
                        collector.collect(arrayList);
                    }

                    public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                        apply((TimeWindow) window, (Iterable<Row>) iterable, (Collector<List<Row>>) collector);
                    }
                }).addSink(new StreamPrintListRowSinkFunction());
            }
            setOutputTable(checkAndGetFirst.getOutputTable());
            return this;
        } catch (Exception e) {
            throw new AkUnclassifiedErrorException(e.getMessage(), e);
        }
    }

    @Override // com.alibaba.alink.operator.stream.StreamOperator
    public /* bridge */ /* synthetic */ PrintStreamOp linkFrom(StreamOperator[] streamOperatorArr) {
        return linkFrom((StreamOperator<?>[]) streamOperatorArr);
    }
}
