package com.netease.arctic.flink.write.hidden.kafka;

import com.netease.arctic.flink.write.hidden.ArcticLogPartitioner;
import com.netease.arctic.flink.write.hidden.LogMsgFactory;
import com.netease.arctic.log.LogData;
import com.netease.arctic.log.LogDataJsonSerialization;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaErrorCode;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
import org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/write/hidden/kafka/HiddenKafkaProducer.class */
public class HiddenKafkaProducer<T> implements LogMsgFactory.Producer<T> {
    private static final Logger LOG = LoggerFactory.getLogger(HiddenKafkaProducer.class);
    protected final Properties producerConfig;
    private final String topic;
    private final LogDataJsonSerialization<T> logDataJsonSerialization;

    @Nullable
    protected transient Callback callback;

    @Nullable
    protected volatile transient Exception asyncException;
    private transient FlinkKafkaInternalProducer<byte[], byte[]> producer;
    private transient FlinkKafkaInternalProducer<byte[], byte[]> transactionalProducer;
    private ArcticLogPartitioner<T> arcticLogPartitioner;
    private int[] partitions;

    public HiddenKafkaProducer(Properties properties, String str, LogDataJsonSerialization<T> logDataJsonSerialization, ArcticLogPartitioner<T> arcticLogPartitioner) {
        this.producerConfig = properties;
        this.topic = str;
        this.logDataJsonSerialization = logDataJsonSerialization;
        this.arcticLogPartitioner = arcticLogPartitioner;
    }

    @Override // com.netease.arctic.flink.write.hidden.LogMsgFactory.Producer
    public void open() throws Exception {
        this.callback = (recordMetadata, exc) -> {
            if (exc != null && this.asyncException == null) {
                this.asyncException = exc;
            }
            acknowledgeMessage();
        };
        this.producer = createProducer();
        this.transactionalProducer = createTransactionalProducer();
        this.transactionalProducer.initTransactions();
        this.partitions = getPartitionsByTopic(this.topic, this.producer);
        LOG.info("HiddenKafkaPartition topic:{}, partitions:{}.", this.topic, this.partitions);
    }

    @Override // com.netease.arctic.flink.write.hidden.LogMsgFactory.Producer
    public void send(LogData<T> logData) throws Exception {
        checkErroneous();
        this.producer.send(new ProducerRecord(this.topic, Integer.valueOf(this.arcticLogPartitioner.partition(logData, this.partitions)), (Long) null, (Object) null, this.logDataJsonSerialization.serialize(logData)), this.callback);
    }

    @Override // com.netease.arctic.flink.write.hidden.LogMsgFactory.Producer
    public void sendToAllPartitions(LogData<T> logData) throws Exception {
        checkErroneous();
        byte[] serialize = this.logDataJsonSerialization.serialize(logData);
        List<ProducerRecord> list = (List) IntStream.of(this.partitions).mapToObj(i -> {
            return new ProducerRecord(this.topic, Integer.valueOf(i), (Long) null, (Object) null, serialize);
        }).collect(Collectors.toList());
        LOG.info("sending {} partitions with flip message={}.", Integer.valueOf(list.size()), logData);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.transactionalProducer.beginTransaction();
            for (ProducerRecord producerRecord : list) {
                checkErroneous();
                this.transactionalProducer.send(producerRecord, this.callback);
            }
            this.transactionalProducer.commitTransaction();
            LOG.info("finished flips sending, cost {}ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (Throwable th) {
            LOG.error("", th);
            this.transactionalProducer.abortTransaction();
            throw new FlinkRuntimeException(th);
        }
    }

    @Override // com.netease.arctic.flink.write.hidden.LogMsgFactory.Producer
    public void flush() {
        this.producer.flush();
    }

    @Override // com.netease.arctic.flink.write.hidden.LogMsgFactory.Producer
    public void close() throws Exception {
        try {
            this.producer.close(Duration.ofSeconds(0L));
            this.transactionalProducer.close(Duration.ofSeconds(0L));
        } catch (Exception e) {
            this.asyncException = (Exception) ExceptionUtils.firstOrSuppressed(e, this.asyncException);
        } finally {
            checkErroneous();
        }
    }

    protected FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer() {
        Properties properties = new Properties();
        properties.putAll(this.producerConfig);
        properties.computeIfAbsent("transactional.id", obj -> {
            return UUID.randomUUID().toString();
        });
        return new FlinkKafkaInternalProducer<>(properties);
    }

    protected FlinkKafkaInternalProducer<byte[], byte[]> createProducer() {
        return new FlinkKafkaInternalProducer<>(this.producerConfig);
    }

    public static int[] getPartitionsByTopic(String str, Producer producer) {
        ArrayList arrayList = new ArrayList(producer.partitionsFor(str));
        arrayList.sort(Comparator.comparingInt((v0) -> {
            return v0.partition();
        }));
        return arrayList.stream().mapToInt((v0) -> {
            return v0.partition();
        }).toArray();
    }

    protected void checkErroneous() throws FlinkKafkaException {
        Exception exc = this.asyncException;
        if (exc != null) {
            this.asyncException = null;
            throw new FlinkKafkaException(FlinkKafkaErrorCode.EXTERNAL_ERROR, "Failed to send data to Kafka: " + exc.getMessage(), exc);
        }
    }

    protected void acknowledgeMessage() {
    }
}
