package com.alibaba.alink.common.mapper;

import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/common/mapper/FlatMapperMTWrapper.class */
public final class FlatMapperMTWrapper extends RichFlatMapFunction<Row, Row> {
    private static final long serialVersionUID = -1568535043779354034L;
    private static final Logger LOG = LoggerFactory.getLogger(FlatMapperMTWrapper.class);
    private static final int DEFAULT_BUFFER_SIZE = 32;
    private final int numThreads;
    private final int bufferSize;
    private final SupplierWithException<FlatMapper> supplier;
    private transient List<BlockingQueue<Row>> inputQueues;
    private transient List<BlockingQueue<Row>> outputQueues;
    private transient ExecutorService executorService;
    private transient long inputBlockId;
    private transient Collector<Row> collector;
    private transient AtomicReference<Throwable> threadException;
    private transient List<Row> buffer;

    /* loaded from: input_file:com/alibaba/alink/common/mapper/FlatMapperMTWrapper$NeedContext.class */
    public interface NeedContext {
        void setContext(RuntimeContext runtimeContext);
    }

    /* loaded from: input_file:com/alibaba/alink/common/mapper/FlatMapperMTWrapper$SupplierWithException.class */
    public interface SupplierWithException<T> extends Serializable {
        T create() throws Exception;
    }

    public FlatMapperMTWrapper(int i, SupplierWithException<FlatMapper> supplierWithException) {
        this(i, DEFAULT_BUFFER_SIZE, supplierWithException);
    }

    public FlatMapperMTWrapper(int i, int i2, SupplierWithException<FlatMapper> supplierWithException) {
        this.inputBlockId = 0L;
        this.numThreads = i;
        this.bufferSize = i2;
        this.supplier = supplierWithException;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.threadException = new AtomicReference<>();
        this.inputQueues = new ArrayList(this.numThreads);
        this.outputQueues = new ArrayList(this.numThreads);
        this.buffer = new ArrayList(this.bufferSize);
        for (int i = 0; i < this.numThreads; i++) {
            this.inputQueues.add(new LinkedBlockingQueue(this.bufferSize));
            this.outputQueues.add(new LinkedBlockingQueue(this.bufferSize));
        }
        this.executorService = new ThreadPoolExecutor(this.numThreads, this.numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(this.numThreads), new BasicThreadFactory.Builder().namingPattern("mapper-mt-wrapper-%d").daemon(true).build(), new ThreadPoolExecutor.AbortPolicy());
        for (int i2 = 0; i2 < this.numThreads; i2++) {
            int i3 = i2;
            FlatMapper create = this.supplier.create();
            if (create instanceof NeedContext) {
                ((NeedContext) create).setContext(getRuntimeContext());
            }
            CompletableFuture.supplyAsync(() -> {
                BlockingQueue blockingQueue = this.inputQueues.get(i3);
                final BlockingQueue<Row> blockingQueue2 = this.outputQueues.get(i3);
                ArrayList<Row> arrayList = new ArrayList(this.bufferSize);
                boolean z = false;
                while (!z) {
                    arrayList.clear();
                    blockingQueue.drainTo(arrayList);
                    for (Row row : arrayList) {
                        try {
                            if (row.getArity() == 0) {
                                z = true;
                                blockingQueue2.put(new Row(0));
                            } else {
                                create.flatMap(row, new Collector<Row>() { // from class: com.alibaba.alink.common.mapper.FlatMapperMTWrapper.1
                                    public void collect(Row row2) {
                                        try {
                                            blockingQueue2.put(row2);
                                        } catch (InterruptedException e) {
                                            throw new AkUnclassifiedErrorException("Error. ", e);
                                        }
                                    }

                                    public void close() {
                                    }
                                });
                            }
                        } catch (Exception e) {
                            throw new AkUnclassifiedErrorException(e.getMessage(), e);
                        }
                    }
                }
                return null;
            }, this.executorService).exceptionally(th -> {
                this.threadException.compareAndSet(null, th);
                return th;
            });
        }
    }

    public void flatMap(Row row, Collector<Row> collector) throws Exception {
        boolean offer;
        this.collector = collector;
        int i = (int) (this.inputBlockId % this.numThreads);
        do {
            collect(this.outputQueues.get(i), collector);
            if (!this.threadException.compareAndSet(null, null)) {
                throw new AkUnclassifiedErrorException(this.threadException.get().getMessage());
            }
            offer = this.inputQueues.get(i).offer(Row.copy(row));
            if (offer) {
                this.inputBlockId++;
            } else {
                i = (i + 1) % this.numThreads;
            }
            if (!this.threadException.compareAndSet(null, null)) {
                throw new AkUnclassifiedErrorException(this.threadException.get().getMessage());
            }
        } while (!offer);
    }

    public void close() throws Exception {
        super.close();
        try {
            if (!this.threadException.compareAndSet(null, null)) {
                throw new AkUnclassifiedErrorException(this.threadException.get().getMessage());
            }
            boolean[] zArr = new boolean[this.numThreads];
            boolean[] zArr2 = new boolean[this.numThreads];
            Arrays.fill(zArr, false);
            Arrays.fill(zArr2, false);
            int i = 0;
            while (this.threadException.compareAndSet(null, null) && i < this.numThreads) {
                for (int i2 = 0; i2 < this.numThreads; i2++) {
                    if (!zArr2[i2]) {
                        if (collect(this.outputQueues.get(i2), this.collector)) {
                            i++;
                            zArr2[i2] = true;
                        }
                        if (!this.threadException.compareAndSet(null, null)) {
                            break;
                        } else if (!zArr[i2]) {
                            zArr[i2] = this.inputQueues.get(i2).offer(new Row(0));
                        }
                    }
                }
            }
            if (!this.threadException.compareAndSet(null, null)) {
                throw new AkUnclassifiedErrorException(this.threadException.get().getMessage());
            }
            if (this.executorService != null) {
                ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{this.executorService});
            }
        } catch (Throwable th) {
            if (this.executorService != null) {
                ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{this.executorService});
            }
            throw th;
        }
    }

    private boolean collect(BlockingQueue<Row> blockingQueue, Collector<Row> collector) {
        boolean z = false;
        this.buffer.clear();
        long drainTo = blockingQueue.drainTo(this.buffer, this.bufferSize);
        for (int i = 0; i < drainTo; i++) {
            Row row = this.buffer.get(i);
            if (row.getArity() > 0) {
                collector.collect(row);
            } else {
                z = true;
            }
        }
        return z;
    }

    public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
        flatMap((Row) obj, (Collector<Row>) collector);
    }
}
