package com.alibaba.alink.operator.local.source;

import com.alibaba.alink.common.MTable;
import com.alibaba.alink.common.annotation.InputPorts;
import com.alibaba.alink.common.annotation.OutputPorts;
import com.alibaba.alink.common.annotation.PortSpec;
import com.alibaba.alink.common.annotation.PortType;
import com.alibaba.alink.common.exceptions.AkIllegalDataException;
import com.alibaba.alink.common.exceptions.AkUnsupportedOperationException;
import com.alibaba.alink.operator.local.AlinkLocalSession;
import com.alibaba.alink.operator.local.LocalOperator;
import com.alibaba.alink.operator.local.source.BaseSourceLocalOp;
import com.alibaba.alink.params.shared.HasNumThreads;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.Row;

@InputPorts
@OutputPorts(values = {@PortSpec(PortType.ANY)})
/* loaded from: input_file:com/alibaba/alink/operator/local/source/BaseSourceLocalOp.class */
public abstract class BaseSourceLocalOp<T extends BaseSourceLocalOp<T>> extends LocalOperator<T> implements HasNumThreads<T> {
    /* JADX INFO: Access modifiers changed from: protected */
    public BaseSourceLocalOp(Params params) {
        super(params);
    }

    @Override // com.alibaba.alink.operator.local.LocalOperator
    public final T linkFrom(LocalOperator<?>... localOperatorArr) {
        throw new AkUnsupportedOperationException("Source operator does not support linkFrom()");
    }

    @Override // com.alibaba.alink.operator.local.LocalOperator
    public MTable getOutputTable() {
        if (isNullOutputTable()) {
            super.setOutputTable(initializeDataSource());
        }
        return super.getOutputTable();
    }

    protected abstract MTable initializeDataSource();

    public static <T extends InputSplit> List<Row> createInput(InputFormat<Row, T> inputFormat, Params params) {
        if (inputFormat == null) {
            throw new IllegalArgumentException("InputFormat must not be null.");
        }
        return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), params);
    }

    public static <T extends InputSplit> List<Row> createInput(InputFormat<Row, T> inputFormat, TypeInformation<Row> typeInformation, Params params) {
        if (inputFormat == null) {
            throw new IllegalArgumentException("InputFormat must not be null.");
        }
        if (typeInformation == null) {
            throw new IllegalArgumentException("Produced type information must not be null.");
        }
        int defaultNumThreads = LocalOperator.getDefaultNumThreads();
        if (params.contains(HasNumThreads.NUM_THREADS)) {
            defaultNumThreads = ((Integer) params.get(HasNumThreads.NUM_THREADS)).intValue();
        }
        int totalFields = typeInformation.getTotalFields();
        AlinkLocalSession.IOTaskRunner iOTaskRunner = new AlinkLocalSession.IOTaskRunner();
        List[] listArr = new List[defaultNumThreads];
        try {
            InputSplit[] createInputSplits = inputFormat.createInputSplits(defaultNumThreads);
            byte[] serialize = SerializationUtils.serialize(inputFormat);
            for (int i = 0; i < defaultNumThreads; i++) {
                int startPos = (int) AlinkLocalSession.DISTRIBUTOR.startPos(i, defaultNumThreads, createInputSplits.length);
                int localRowCnt = (int) AlinkLocalSession.DISTRIBUTOR.localRowCnt(i, defaultNumThreads, createInputSplits.length);
                int i2 = i;
                if (localRowCnt > 0) {
                    iOTaskRunner.submit(() -> {
                        ArrayList arrayList = new ArrayList();
                        InputFormat inputFormat2 = (InputFormat) SerializationUtils.deserialize(serialize);
                        for (int i3 = startPos; i3 < startPos + localRowCnt; i3++) {
                            try {
                                try {
                                    if (inputFormat2 instanceof RichInputFormat) {
                                        ((RichInputFormat) inputFormat2).openInputFormat();
                                    }
                                } catch (IOException e) {
                                    e.printStackTrace();
                                    if (inputFormat2 instanceof RichInputFormat) {
                                        try {
                                            ((RichInputFormat) inputFormat2).closeInputFormat();
                                        } catch (IOException e2) {
                                            e2.printStackTrace();
                                        }
                                    }
                                }
                                try {
                                    try {
                                        inputFormat2.open(createInputSplits[i3]);
                                        while (!inputFormat2.reachedEnd()) {
                                            Row row = (Row) inputFormat2.nextRecord(new Row(totalFields));
                                            if (row != null) {
                                                arrayList.add(row);
                                            }
                                        }
                                        try {
                                            inputFormat2.close();
                                        } catch (Exception e3) {
                                            e3.printStackTrace();
                                        }
                                    } catch (IOException e4) {
                                        e4.printStackTrace();
                                        try {
                                            inputFormat2.close();
                                        } catch (Exception e5) {
                                            e5.printStackTrace();
                                        }
                                    }
                                    if (inputFormat2 instanceof RichInputFormat) {
                                        try {
                                            ((RichInputFormat) inputFormat2).closeInputFormat();
                                        } catch (IOException e6) {
                                            e6.printStackTrace();
                                        }
                                    }
                                } finally {
                                    break;
                                }
                            } catch (Throwable th) {
                                if (inputFormat2 instanceof RichInputFormat) {
                                    try {
                                        ((RichInputFormat) inputFormat2).closeInputFormat();
                                    } catch (IOException e7) {
                                        e7.printStackTrace();
                                    }
                                }
                                throw th;
                            }
                        }
                        listArr[i2] = arrayList;
                    });
                }
            }
            iOTaskRunner.join();
            ArrayList arrayList = new ArrayList();
            for (int i3 = 0; i3 < defaultNumThreads; i3++) {
                if (listArr[i3] != null) {
                    arrayList.addAll(listArr[i3]);
                }
            }
            return arrayList;
        } catch (IOException e) {
            throw new AkIllegalDataException("Error in data input.", e);
        }
    }

    @Override // com.alibaba.alink.operator.local.LocalOperator
    public /* bridge */ /* synthetic */ LocalOperator linkFrom(LocalOperator[] localOperatorArr) {
        return linkFrom((LocalOperator<?>[]) localOperatorArr);
    }
}
