package com.alibaba.alink.operator.stream.sink;

import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.annotation.NameEn;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.common.io.dummy.DummyOutputFormat;
import com.alibaba.alink.operator.common.modelstream.FileModelStreamSink;
import com.alibaba.alink.operator.common.modelstream.ModelStreamUtils;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.params.io.ModelStreamFileSinkParams;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

@InputPorts(values = {@PortSpec(PortType.MODEL_STREAM)})
@IoOpAnnotation(name = "modelstream_file", ioType = IOType.SinkStream)
@NameCn("模型流导出")
@NameEn("Model Stream File Sink")
/* loaded from: input_file:com/alibaba/alink/operator/stream/sink/ModelStreamFileSinkStreamOp.class */
public final class ModelStreamFileSinkStreamOp extends BaseSinkStreamOp<ModelStreamFileSinkStreamOp> implements ModelStreamFileSinkParams<ModelStreamFileSinkStreamOp> {
    private static final long serialVersionUID = 4650091128460845189L;

    public ModelStreamFileSinkStreamOp() {
        super(AnnotationUtils.annotatedName(ModelStreamFileSinkStreamOp.class), new Params());
    }

    public ModelStreamFileSinkStreamOp(Params params) {
        super(AnnotationUtils.annotatedName(ModelStreamFileSinkStreamOp.class), params);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.alink.operator.stream.sink.BaseSinkStreamOp
    public ModelStreamFileSinkStreamOp sinkFrom(StreamOperator<?> streamOperator) {
        TableSchema schema = streamOperator.getSchema();
        final int findTimestampColIndexWithAssertAndHint = ModelStreamUtils.findTimestampColIndexWithAssertAndHint(schema);
        final int findCountColIndexWithAssertAndHint = ModelStreamUtils.findCountColIndexWithAssertAndHint(schema);
        final String schema2SchemaStr = TableUtil.schema2SchemaStr(new TableSchema((String[]) ArrayUtils.removeAll(schema.getFieldNames(), new int[]{findTimestampColIndexWithAssertAndHint, findCountColIndexWithAssertAndHint}), (TypeInformation[]) ArrayUtils.removeAll(schema.getFieldTypes(), new int[]{findTimestampColIndexWithAssertAndHint, findCountColIndexWithAssertAndHint})));
        final FilePath filePath = getFilePath();
        final int numKeepModel = getNumKeepModel();
        try {
            new FileModelStreamSink(filePath, schema2SchemaStr).initializeGlobal();
            DataStream<Row> dataStream = streamOperator.getDataStream();
            SingleOutputStreamOperator flatMap = dataStream.map(new RichMapFunction<Row, Tuple4<Timestamp, Integer, Long, Long>>() { // from class: com.alibaba.alink.operator.stream.sink.ModelStreamFileSinkStreamOp.3
                public Tuple4<Timestamp, Integer, Long, Long> map(Row row) {
                    return Tuple4.of((Timestamp) row.getField(findTimestampColIndexWithAssertAndHint), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), (Long) row.getField(findCountColIndexWithAssertAndHint), 1L);
                }
            }).keyBy(new int[]{0, 1, 2}).reduce(new ReduceFunction<Tuple4<Timestamp, Integer, Long, Long>>() { // from class: com.alibaba.alink.operator.stream.sink.ModelStreamFileSinkStreamOp.2
                public Tuple4<Timestamp, Integer, Long, Long> reduce(Tuple4<Timestamp, Integer, Long, Long> tuple4, Tuple4<Timestamp, Integer, Long, Long> tuple42) {
                    return Tuple4.of(tuple4.f0, tuple4.f1, tuple4.f2, Long.valueOf(((Long) tuple4.f3).longValue() + ((Long) tuple42.f3).longValue()));
                }
            }).keyBy(new int[]{0, 2}).flatMap(new RichFlatMapFunction<Tuple4<Timestamp, Integer, Long, Long>, Tuple4<Timestamp, Integer, Long, Long>>() { // from class: com.alibaba.alink.operator.stream.sink.ModelStreamFileSinkStreamOp.1
                private transient MapState<Integer, Tuple4<Timestamp, Integer, Long, Long>> latest;

                public void open(Configuration configuration) throws Exception {
                    super.open(configuration);
                    this.latest = getRuntimeContext().getMapState(new MapStateDescriptor("latest", Types.INT, new TupleTypeInfo(new TypeInformation[]{Types.SQL_TIMESTAMP, Types.INT, Types.LONG, Types.LONG})));
                }

                public void flatMap(Tuple4<Timestamp, Integer, Long, Long> tuple4, Collector<Tuple4<Timestamp, Integer, Long, Long>> collector) throws Exception {
                    this.latest.put(tuple4.f1, tuple4);
                    long j = 0;
                    long j2 = -1;
                    for (Map.Entry entry : this.latest.entries()) {
                        j2 = ((Long) ((Tuple4) entry.getValue()).f2).longValue();
                        j += ((Long) ((Tuple4) entry.getValue()).f3).longValue();
                    }
                    if (j2 == j) {
                        Iterator it = this.latest.entries().iterator();
                        while (it.hasNext()) {
                            collector.collect(((Map.Entry) it.next()).getValue());
                        }
                    }
                }

                public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                    flatMap((Tuple4<Timestamp, Integer, Long, Long>) obj, (Collector<Tuple4<Timestamp, Integer, Long, Long>>) collector);
                }
            });
            dataStream.map(new RichMapFunction<Row, Tuple3<Timestamp, Integer, Row>>() { // from class: com.alibaba.alink.operator.stream.sink.ModelStreamFileSinkStreamOp.6
                public Tuple3<Timestamp, Integer, Row> map(Row row) {
                    return Tuple3.of((Timestamp) row.getField(findTimestampColIndexWithAssertAndHint), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), ModelStreamUtils.genRowWithoutIdentifier(row, findTimestampColIndexWithAssertAndHint, findCountColIndexWithAssertAndHint));
                }
            }).keyBy(new int[]{0, 1}).connect(flatMap.keyBy(new int[]{0, 1})).flatMap(new RichCoFlatMapFunction<Tuple3<Timestamp, Integer, Row>, Tuple4<Timestamp, Integer, Long, Long>, Tuple1<Timestamp>>() { // from class: com.alibaba.alink.operator.stream.sink.ModelStreamFileSinkStreamOp.5
                private final Map<Tuple2<Timestamp, Integer>, Tuple3<FileModelStreamSink, Long, Long>> writerContainer = new HashMap();

                public void open(Configuration configuration) throws Exception {
                    super.open(configuration);
                }

                public void flatMap1(Tuple3<Timestamp, Integer, Row> tuple3, Collector<Tuple1<Timestamp>> collector) {
                    Map<Tuple2<Timestamp, Integer>, Tuple3<FileModelStreamSink, Long, Long>> map = this.writerContainer;
                    Tuple2<Timestamp, Integer> of = Tuple2.of(tuple3.f0, tuple3.f1);
                    FilePath filePath2 = filePath;
                    String str = schema2SchemaStr;
                    map.compute(of, (tuple2, tuple32) -> {
                        if (tuple32 == null) {
                            FileModelStreamSink fileModelStreamSink = new FileModelStreamSink(filePath2, str);
                            try {
                                fileModelStreamSink.open((Timestamp) tuple3.f0, ((Integer) tuple3.f1).intValue());
                                fileModelStreamSink.collect((Row) tuple3.f2);
                                return Tuple3.of(fileModelStreamSink, 1L, (Object) null);
                            } catch (IOException e) {
                                throw new AkUnclassifiedErrorException("Error. ", e);
                            }
                        }
                        if (tuple32.f0 != null) {
                            ((FileModelStreamSink) tuple32.f0).collect((Row) tuple3.f2);
                            tuple32.f1 = Long.valueOf(((Long) tuple32.f1).longValue() + 1);
                            if (tuple32.f2 == null || !((Long) tuple32.f2).equals(tuple32.f1)) {
                                return Tuple3.of(tuple32.f0, tuple32.f1, tuple32.f2);
                            }
                            ((FileModelStreamSink) tuple32.f0).close();
                            collector.collect(Tuple1.of(tuple2.f0));
                            return null;
                        }
                        FileModelStreamSink fileModelStreamSink2 = new FileModelStreamSink(filePath2, str);
                        try {
                            fileModelStreamSink2.open((Timestamp) tuple3.f0, ((Integer) tuple3.f1).intValue());
                            fileModelStreamSink2.collect((Row) tuple3.f2);
                            if (tuple32.f2 == null || !((Long) tuple32.f2).equals(1L)) {
                                return Tuple3.of(fileModelStreamSink2, 1L, (Object) null);
                            }
                            fileModelStreamSink2.close();
                            collector.collect(Tuple1.of(tuple2.f0));
                            return null;
                        } catch (IOException e2) {
                            throw new AkUnclassifiedErrorException("Error. ", e2);
                        }
                    });
                }

                public void flatMap2(Tuple4<Timestamp, Integer, Long, Long> tuple4, Collector<Tuple1<Timestamp>> collector) {
                    this.writerContainer.compute(Tuple2.of(tuple4.f0, tuple4.f1), (tuple2, tuple3) -> {
                        if (tuple3 == null) {
                            return Tuple3.of((Object) null, (Object) null, tuple4.f3);
                        }
                        if (!((Long) tuple4.f3).equals(tuple3.f1)) {
                            return Tuple3.of(tuple3.f0, tuple3.f1, tuple4.f3);
                        }
                        ((FileModelStreamSink) tuple3.f0).close();
                        collector.collect(Tuple1.of(tuple2.f0));
                        return null;
                    });
                }

                public /* bridge */ /* synthetic */ void flatMap2(Object obj, Collector collector) throws Exception {
                    flatMap2((Tuple4<Timestamp, Integer, Long, Long>) obj, (Collector<Tuple1<Timestamp>>) collector);
                }

                public /* bridge */ /* synthetic */ void flatMap1(Object obj, Collector collector) throws Exception {
                    flatMap1((Tuple3<Timestamp, Integer, Row>) obj, (Collector<Tuple1<Timestamp>>) collector);
                }
            }).keyBy(new int[]{0}).connect(flatMap.keyBy(new int[]{0}).flatMap(new RichFlatMapFunction<Tuple4<Timestamp, Integer, Long, Long>, Tuple4<Timestamp, Integer, Integer, Long>>() { // from class: com.alibaba.alink.operator.stream.sink.ModelStreamFileSinkStreamOp.7
                private transient ListState<Tuple2<Integer, Long>> filesCounter;

                public void open(Configuration configuration) throws Exception {
                    super.open(configuration);
                    this.filesCounter = getRuntimeContext().getListState(new ListStateDescriptor("filesCounter", new TupleTypeInfo(new TypeInformation[]{Types.INT, Types.LONG})));
                }

                public void flatMap(Tuple4<Timestamp, Integer, Long, Long> tuple4, Collector<Tuple4<Timestamp, Integer, Integer, Long>> collector) throws Exception {
                    Long l = (Long) tuple4.f3;
                    Integer num = 1;
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(Tuple2.of(tuple4.f1, tuple4.f3));
                    for (Tuple2 tuple2 : (Iterable) this.filesCounter.get()) {
                        l = Long.valueOf(l.longValue() + ((Long) tuple2.f1).longValue());
                        num = Integer.valueOf(num.intValue() + 1);
                        arrayList.add(tuple2);
                    }
                    if (((Long) tuple4.f2).equals(l)) {
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            collector.collect(Tuple4.of(tuple4.f0, ((Tuple2) it.next()).f0, num, tuple4.f2));
                        }
                    }
                    this.filesCounter.add(Tuple2.of(tuple4.f1, tuple4.f3));
                }

                public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                    flatMap((Tuple4<Timestamp, Integer, Long, Long>) obj, (Collector<Tuple4<Timestamp, Integer, Integer, Long>>) collector);
                }
            }).keyBy(new int[]{0})).flatMap(new RichCoFlatMapFunction<Tuple1<Timestamp>, Tuple4<Timestamp, Integer, Integer, Long>, byte[]>() { // from class: com.alibaba.alink.operator.stream.sink.ModelStreamFileSinkStreamOp.4
                private transient ValueState<Integer> filesCounter;
                private transient ListState<Tuple3<Integer, Integer, Long>> total;

                public void open(Configuration configuration) throws Exception {
                    super.open(configuration);
                    this.filesCounter = getRuntimeContext().getState(new ValueStateDescriptor("filesCounter", Types.INT));
                    this.total = getRuntimeContext().getListState(new ListStateDescriptor("total", new TupleTypeInfo(new TypeInformation[]{Types.INT, Types.LONG})));
                }

                public void flatMap1(Tuple1<Timestamp> tuple1, Collector<byte[]> collector) throws Exception {
                    Integer num = (Integer) this.filesCounter.value();
                    Integer valueOf = num == null ? 1 : Integer.valueOf(num.intValue() + 1);
                    ArrayList arrayList = new ArrayList();
                    Iterator it = ((Iterable) this.total.get()).iterator();
                    while (it.hasNext()) {
                        arrayList.add((Tuple3) it.next());
                    }
                    if (!arrayList.isEmpty() && ((Integer) ((Tuple3) arrayList.get(0)).f1).equals(Integer.valueOf(arrayList.size())) && ((Integer) ((Tuple3) arrayList.get(0)).f1).equals(valueOf)) {
                        ArrayList arrayList2 = new ArrayList();
                        Iterator it2 = arrayList.iterator();
                        while (it2.hasNext()) {
                            arrayList2.add(((Tuple3) it2.next()).f0);
                        }
                        new FileModelStreamSink(filePath, schema2SchemaStr).finalizeGlobal((Timestamp) tuple1.f0, ((Long) ((Tuple3) arrayList.get(0)).f2).longValue(), arrayList2, numKeepModel);
                    }
                    this.filesCounter.update(valueOf);
                }

                public void flatMap2(Tuple4<Timestamp, Integer, Integer, Long> tuple4, Collector<byte[]> collector) throws Exception {
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(Tuple3.of(tuple4.f1, tuple4.f2, tuple4.f3));
                    Iterator it = ((Iterable) this.total.get()).iterator();
                    while (it.hasNext()) {
                        arrayList.add((Tuple3) it.next());
                    }
                    if (((Integer) ((Tuple3) arrayList.get(0)).f1).equals(Integer.valueOf(arrayList.size())) && ((Integer) ((Tuple3) arrayList.get(0)).f1).equals(this.filesCounter.value())) {
                        ArrayList arrayList2 = new ArrayList();
                        Iterator it2 = arrayList.iterator();
                        while (it2.hasNext()) {
                            arrayList2.add(((Tuple3) it2.next()).f0);
                        }
                        new FileModelStreamSink(filePath, schema2SchemaStr).finalizeGlobal((Timestamp) tuple4.f0, ((Long) ((Tuple3) arrayList.get(0)).f2).longValue(), arrayList2, numKeepModel);
                    }
                    this.total.add(Tuple3.of(tuple4.f1, tuple4.f2, tuple4.f3));
                }

                public /* bridge */ /* synthetic */ void flatMap2(Object obj, Collector collector) throws Exception {
                    flatMap2((Tuple4<Timestamp, Integer, Integer, Long>) obj, (Collector<byte[]>) collector);
                }

                public /* bridge */ /* synthetic */ void flatMap1(Object obj, Collector collector) throws Exception {
                    flatMap1((Tuple1<Timestamp>) obj, (Collector<byte[]>) collector);
                }
            }).writeUsingOutputFormat(new DummyOutputFormat());
            return this;
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    @Override // com.alibaba.alink.operator.stream.sink.BaseSinkStreamOp
    public /* bridge */ /* synthetic */ ModelStreamFileSinkStreamOp sinkFrom(StreamOperator streamOperator) {
        return sinkFrom((StreamOperator<?>) streamOperator);
    }
}
