package org.apache.flink.connector.kafka.sink;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.MetricUtil;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/kafka/sink/KafkaWriter.class */
class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable, KafkaWriterState> {
    private static final Logger LOG;
    private static final String KAFKA_PRODUCER_METRIC_NAME = "KafkaProducer";
    private static final long METRIC_UPDATE_INTERVAL_MILLIS = 500;
    private static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    private static final String KEY_REGISTER_METRICS = "register.producer.metrics";
    private static final String KAFKA_PRODUCER_METRICS = "producer-metrics";
    private final DeliveryGuarantee deliveryGuarantee;
    private final Properties kafkaProducerConfig;
    private final String transactionalIdPrefix;
    private final KafkaRecordSerializationSchema<IN> recordSerializer;
    private final Callback deliveryCallback;
    private final KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext;
    private final SinkWriterMetricGroup metricGroup;
    private final Counter numBytesOutCounter;
    private final Sink.ProcessingTimeService timeService;
    private final boolean disabledMetrics;
    private long latestOutgoingByteTotal;
    private Metric byteOutMetric;
    private FlinkKafkaInternalProducer<byte[], byte[]> currentProducer;
    private final KafkaWriterState kafkaWriterState;
    private long lastCheckpointId;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap();
    private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool = new ArrayDeque();
    private final Closer closer = Closer.create();
    private boolean closed = false;
    private long lastSync = System.currentTimeMillis();

    /* loaded from: input_file:org/apache/flink/connector/kafka/sink/KafkaWriter$WriterCallback.class */
    private class WriterCallback implements Callback {
        private final MailboxExecutor mailboxExecutor;

        public WriterCallback(MailboxExecutor mailboxExecutor) {
            this.mailboxExecutor = mailboxExecutor;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                FlinkKafkaInternalProducer flinkKafkaInternalProducer = KafkaWriter.this.currentProducer;
                this.mailboxExecutor.execute(() -> {
                    throwException(recordMetadata, exc, flinkKafkaInternalProducer);
                }, "Failed to send data to Kafka");
            }
        }

        private void throwException(RecordMetadata recordMetadata, Exception exc, FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer) {
            String format = String.format("Failed to send data to Kafka %s with %s ", recordMetadata, flinkKafkaInternalProducer);
            if (exc instanceof UnknownProducerIdException) {
                format = format + KafkaCommitter.UNKNOWN_PRODUCER_ID_ERROR_MESSAGE;
            }
            throw new FlinkRuntimeException(format, exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaWriter(DeliveryGuarantee deliveryGuarantee, Properties properties, String str, Sink.InitContext initContext, KafkaRecordSerializationSchema<IN> kafkaRecordSerializationSchema, SerializationSchema.InitializationContext initializationContext, List<KafkaWriterState> list) {
        this.deliveryGuarantee = (DeliveryGuarantee) Preconditions.checkNotNull(deliveryGuarantee, "deliveryGuarantee");
        this.kafkaProducerConfig = (Properties) Preconditions.checkNotNull(properties, "kafkaProducerConfig");
        this.transactionalIdPrefix = (String) Preconditions.checkNotNull(str, "transactionalIdPrefix");
        this.recordSerializer = (KafkaRecordSerializationSchema) Preconditions.checkNotNull(kafkaRecordSerializationSchema, "recordSerializer");
        this.deliveryCallback = new WriterCallback(initContext.getMailboxExecutor());
        this.disabledMetrics = (properties.containsKey("flink.disable-metrics") && Boolean.parseBoolean(properties.get("flink.disable-metrics").toString())) || (properties.containsKey(KEY_REGISTER_METRICS) && !Boolean.parseBoolean(properties.get(KEY_REGISTER_METRICS).toString()));
        Preconditions.checkNotNull(initContext, "sinkInitContext");
        this.timeService = initContext.getProcessingTimeService();
        this.metricGroup = initContext.metricGroup();
        this.numBytesOutCounter = this.metricGroup.getIOMetricGroup().getNumBytesOutCounter();
        this.kafkaSinkContext = new DefaultKafkaSinkContext(initContext.getSubtaskId(), initContext.getNumberOfParallelSubtasks(), properties);
        try {
            kafkaRecordSerializationSchema.open(initializationContext, this.kafkaSinkContext);
            this.kafkaWriterState = new KafkaWriterState(str);
            this.lastCheckpointId = initContext.getRestoredCheckpointId().orElse(0L);
            if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
                abortLingeringTransactions((List) Preconditions.checkNotNull(list, "recoveredStates"), this.lastCheckpointId + 1);
                this.currentProducer = getTransactionalProducer(this.lastCheckpointId + 1);
                this.currentProducer.beginTransaction();
            } else {
                if (deliveryGuarantee != DeliveryGuarantee.AT_LEAST_ONCE && deliveryGuarantee != DeliveryGuarantee.NONE) {
                    throw new UnsupportedOperationException("Unsupported Kafka writer semantic " + this.deliveryGuarantee);
                }
                this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null);
                this.closer.register(this.currentProducer);
                initKafkaMetrics(this.currentProducer);
            }
            initFlinkMetrics();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Cannot initialize schema.", e);
        }
    }

    public void write(IN in, SinkWriter.Context context) throws IOException {
        this.currentProducer.send(this.recordSerializer.serialize(in, this.kafkaSinkContext, context.timestamp()), this.deliveryCallback);
    }

    public List<KafkaCommittable> prepareCommit(boolean z) {
        if (this.deliveryGuarantee != DeliveryGuarantee.NONE || z) {
            this.currentProducer.flush();
        }
        if (this.deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {
            return Collections.emptyList();
        }
        FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer = this.currentProducer;
        Deque<FlinkKafkaInternalProducer<byte[], byte[]>> deque = this.producerPool;
        deque.getClass();
        List<KafkaCommittable> singletonList = Collections.singletonList(KafkaCommittable.of(flinkKafkaInternalProducer, (v1) -> {
            r1.add(v1);
        }));
        LOG.debug("Committing {} committables, final commit={}.", singletonList, Boolean.valueOf(z));
        return singletonList;
    }

    public List<KafkaWriterState> snapshotState(long j) throws IOException {
        if (this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            this.currentProducer = getTransactionalProducer(j + 1);
            this.currentProducer.beginTransaction();
        }
        return ImmutableList.of(this.kafkaWriterState);
    }

    public void close() throws Exception {
        this.closed = true;
        LOG.debug("Closing writer with {}", this.currentProducer);
        Deque<FlinkKafkaInternalProducer<byte[], byte[]>> deque = this.producerPool;
        deque.getClass();
        IOUtils.closeAll(new AutoCloseable[]{this::abortCurrentProducer, this.closer, deque::clear, () -> {
            Preconditions.checkState(this.currentProducer.isClosed());
            this.currentProducer = null;
        }});
    }

    private void abortCurrentProducer() {
        if (this.currentProducer.isInTransaction()) {
            try {
                this.currentProducer.abortTransaction();
            } catch (ProducerFencedException e) {
                LOG.debug("Producer {} fenced while aborting", this.currentProducer.getTransactionalId());
            }
        }
    }

    @VisibleForTesting
    Deque<FlinkKafkaInternalProducer<byte[], byte[]>> getProducerPool() {
        return this.producerPool;
    }

    @VisibleForTesting
    FlinkKafkaInternalProducer<byte[], byte[]> getCurrentProducer() {
        return this.currentProducer;
    }

    void abortLingeringTransactions(List<KafkaWriterState> list, long j) {
        ArrayList newArrayList = Lists.newArrayList(new String[]{this.transactionalIdPrefix});
        if (!list.isEmpty()) {
            KafkaWriterState kafkaWriterState = list.get(0);
            if (!kafkaWriterState.getTransactionalIdPrefix().equals(this.transactionalIdPrefix)) {
                newArrayList.add(kafkaWriterState.getTransactionalIdPrefix());
                LOG.warn("Transactional id prefix from previous execution {} has changed to {}.", kafkaWriterState.getTransactionalIdPrefix(), this.transactionalIdPrefix);
            }
        }
        int parallelInstanceId = this.kafkaSinkContext.getParallelInstanceId();
        int numberOfParallelInstances = this.kafkaSinkContext.getNumberOfParallelInstances();
        Function function = this::getOrCreateTransactionalProducer;
        Deque<FlinkKafkaInternalProducer<byte[], byte[]>> deque = this.producerPool;
        deque.getClass();
        TransactionAborter transactionAborter = new TransactionAborter(parallelInstanceId, numberOfParallelInstances, function, (v1) -> {
            r5.add(v1);
        });
        Throwable th = null;
        try {
            try {
                transactionAborter.abortLingeringTransactions(newArrayList, j);
                if (transactionAborter != null) {
                    if (0 == 0) {
                        transactionAborter.close();
                        return;
                    }
                    try {
                        transactionAborter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (transactionAborter != null) {
                if (th != null) {
                    try {
                        transactionAborter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    transactionAborter.close();
                }
            }
            throw th4;
        }
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(long j) {
        Preconditions.checkState(j > this.lastCheckpointId, "Expected %s > %s", new Object[]{Long.valueOf(j), Long.valueOf(this.lastCheckpointId)});
        FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer = null;
        long j2 = this.lastCheckpointId;
        while (true) {
            long j3 = j2 + 1;
            if (j3 > j) {
                break;
            }
            flinkKafkaInternalProducer = getOrCreateTransactionalProducer(TransactionalIdFactory.buildTransactionalId(this.transactionalIdPrefix, this.kafkaSinkContext.getParallelInstanceId(), j3));
            j2 = j3;
        }
        this.lastCheckpointId = j;
        if (!$assertionsDisabled && flinkKafkaInternalProducer == null) {
            throw new AssertionError();
        }
        LOG.info("Created new transactional producer {}", flinkKafkaInternalProducer.getTransactionalId());
        return flinkKafkaInternalProducer;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v2, types: [org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer, java.io.Closeable] */
    private FlinkKafkaInternalProducer<byte[], byte[]> getOrCreateTransactionalProducer(String str) {
        FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer;
        FlinkKafkaInternalProducer<byte[], byte[]> poll = this.producerPool.poll();
        if (poll == null) {
            ?? flinkKafkaInternalProducer2 = new FlinkKafkaInternalProducer(this.kafkaProducerConfig, str);
            this.closer.register((Closeable) flinkKafkaInternalProducer2);
            flinkKafkaInternalProducer2.initTransactions();
            initKafkaMetrics(flinkKafkaInternalProducer2);
            flinkKafkaInternalProducer = flinkKafkaInternalProducer2;
        } else {
            poll.initTransactionId(str);
            flinkKafkaInternalProducer = poll;
        }
        return flinkKafkaInternalProducer;
    }

    private void initFlinkMetrics() {
        this.metricGroup.setCurrentSendTimeGauge(this::computeSendTime);
        registerMetricSync();
    }

    private void initKafkaMetrics(FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer) {
        this.byteOutMetric = MetricUtil.getKafkaMetric(flinkKafkaInternalProducer.metrics(), KAFKA_PRODUCER_METRICS, "outgoing-byte-total");
        if (this.disabledMetrics) {
            return;
        }
        flinkKafkaInternalProducer.metrics().entrySet().forEach(initMetric(this.metricGroup.addGroup(KAFKA_PRODUCER_METRIC_NAME)));
    }

    private Consumer<Map.Entry<MetricName, ? extends Metric>> initMetric(MetricGroup metricGroup) {
        return entry -> {
            String name = ((MetricName) entry.getKey()).name();
            Metric metric = (Metric) entry.getValue();
            if (this.previouslyCreatedMetrics.containsKey(name)) {
                this.previouslyCreatedMetrics.get(name).setKafkaMetric(metric);
                return;
            }
            KafkaMetricMutableWrapper kafkaMetricMutableWrapper = new KafkaMetricMutableWrapper(metric);
            this.previouslyCreatedMetrics.put(name, kafkaMetricMutableWrapper);
            metricGroup.gauge(name, kafkaMetricMutableWrapper);
        };
    }

    private long computeSendTime() {
        FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer = this.currentProducer;
        if (flinkKafkaInternalProducer == null) {
            return -1L;
        }
        return ((Number) MetricUtil.getKafkaMetric(flinkKafkaInternalProducer.metrics(), KAFKA_PRODUCER_METRICS, "request-latency-avg").metricValue()).longValue() + ((Number) MetricUtil.getKafkaMetric(flinkKafkaInternalProducer.metrics(), KAFKA_PRODUCER_METRICS, "record-queue-time-avg").metricValue()).longValue();
    }

    private void registerMetricSync() {
        this.timeService.registerProcessingTimer(this.lastSync + METRIC_UPDATE_INTERVAL_MILLIS, j -> {
            if (this.closed) {
                return;
            }
            long longValue = ((Number) this.byteOutMetric.metricValue()).longValue();
            this.numBytesOutCounter.inc(longValue - this.latestOutgoingByteTotal);
            this.latestOutgoingByteTotal = longValue;
            this.lastSync = j;
            registerMetricSync();
        });
    }

    static {
        $assertionsDisabled = !KafkaWriter.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(KafkaWriter.class);
    }
}
