package com.alibaba.alink.common.mapper;

import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/alink/common/mapper/MapperMTWrapper.class */
public final class MapperMTWrapper extends RichFlatMapFunction<Row, Row> {
    private static final long serialVersionUID = -1568535043779354034L;
    private static final Logger LOG = LoggerFactory.getLogger(MapperMTWrapper.class);
    private static final int QUEUE_CAPACITY = 32;
    private final int numThreads;
    private final SupplierWithException<FunctionWithException<Row, Row>> supplier;
    private transient List<BlockingQueue<Row>> inputQueues;
    private transient List<BlockingQueue<Row>> outputQueues;
    private transient ExecutorService executorService;
    private transient long numInputRecords = 0;
    private transient long numOutputRecords = 0;
    private transient Collector<Row> collector;
    private transient AtomicReference<Throwable> threadException;

    /* loaded from: input_file:com/alibaba/alink/common/mapper/MapperMTWrapper$FunctionWithException.class */
    public interface FunctionWithException<T, R> {
        R apply(T t) throws Exception;
    }

    /* loaded from: input_file:com/alibaba/alink/common/mapper/MapperMTWrapper$SupplierWithException.class */
    public interface SupplierWithException<T> {
        T create() throws Exception;
    }

    public MapperMTWrapper(int i, SupplierWithException<FunctionWithException<Row, Row>> supplierWithException) {
        this.numThreads = i;
        this.supplier = supplierWithException;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.threadException = new AtomicReference<>();
        this.inputQueues = new ArrayList(this.numThreads);
        this.outputQueues = new ArrayList(this.numThreads);
        for (int i = 0; i < this.numThreads; i++) {
            this.inputQueues.add(new ArrayBlockingQueue(QUEUE_CAPACITY));
            this.outputQueues.add(new ArrayBlockingQueue(QUEUE_CAPACITY));
        }
        this.executorService = new ThreadPoolExecutor(this.numThreads, this.numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(this.numThreads), new BasicThreadFactory.Builder().namingPattern("model-mapper-%d").daemon(true).build(), new ThreadPoolExecutor.AbortPolicy());
        this.numInputRecords = 0L;
        this.numOutputRecords = 0L;
        for (int i2 = 0; i2 < this.numThreads; i2++) {
            int i3 = i2;
            FunctionWithException<Row, Row> create = this.supplier.create();
            CompletableFuture.supplyAsync(() -> {
                BlockingQueue blockingQueue = this.inputQueues.get(i3);
                BlockingQueue<Row> blockingQueue2 = this.outputQueues.get(i3);
                boolean z = false;
                while (!z) {
                    ArrayList<Row> arrayList = new ArrayList();
                    blockingQueue.drainTo(arrayList);
                    for (Row row : arrayList) {
                        try {
                            if (row.getArity() == 0) {
                                z = true;
                                blockingQueue2.put(new Row(0));
                            } else {
                                blockingQueue2.put((Row) create.apply(row));
                            }
                        } catch (Exception e) {
                            throw new AkUnclassifiedErrorException("Error. ", e);
                        }
                    }
                }
                return null;
            }, this.executorService).exceptionally(th -> {
                this.threadException.compareAndSet(null, th);
                return th;
            });
        }
    }

    public void flatMap(Row row, Collector<Row> collector) throws Exception {
        boolean offer;
        this.collector = collector;
        int i = (int) (this.numInputRecords % this.numThreads);
        do {
            this.numOutputRecords += drainRead(this.outputQueues.get(i), false, null, collector);
            offer = this.inputQueues.get(i).offer(Row.copy(row));
            if (offer) {
                this.numInputRecords++;
            } else {
                if (!this.threadException.compareAndSet(null, null)) {
                    throw new AkUnclassifiedErrorException(this.threadException.get().getMessage());
                }
                i = (i + 1) % this.numThreads;
                Thread.yield();
            }
        } while (!offer);
    }

    public void close() throws Exception {
        boolean offer;
        super.close();
        for (int i = 0; i < this.numThreads; i++) {
            do {
                this.numOutputRecords += drainRead(this.outputQueues.get(i), false, null, this.collector);
                offer = this.inputQueues.get(i).offer(new Row(0));
                if (!offer) {
                    if (!this.threadException.compareAndSet(null, null)) {
                        throw new AkUnclassifiedErrorException(this.threadException.get().getMessage());
                    }
                    Thread.yield();
                }
            } while (!offer);
        }
        for (int i2 = 0; i2 < this.numThreads; i2++) {
            this.numOutputRecords += drainRead(this.outputQueues.get(i2), true, this.threadException, this.collector);
        }
        if (this.executorService != null) {
            ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{this.executorService});
        }
        if (!this.threadException.compareAndSet(null, null)) {
            throw new AkUnclassifiedErrorException(this.threadException.get().getMessage());
        }
    }

    private long drainRead(BlockingQueue<Row> blockingQueue, boolean z, AtomicReference<Throwable> atomicReference, Collector<Row> collector) throws Exception {
        boolean z2;
        long j = 0;
        if (z) {
            boolean z3 = false;
            do {
                Row poll = blockingQueue.poll();
                if (poll == null) {
                    if (!atomicReference.compareAndSet(null, null)) {
                        return j;
                    }
                    Thread.yield();
                } else if (poll.getArity() > 0) {
                    collector.collect(poll);
                    j++;
                } else {
                    z3 = true;
                }
            } while (!z3);
            return j;
        }
        do {
            Row poll2 = blockingQueue.poll();
            z2 = poll2 != null;
            if (z2) {
                collector.collect(poll2);
                j++;
            }
        } while (z2);
        return j;
    }

    public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
        flatMap((Row) obj, (Collector<Row>) collector);
    }
}
