package com.netease.arctic.flink.read;

import com.netease.arctic.flink.read.internals.KafkaConsumerThread;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.Handover;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;

/* loaded from: input_file:com/netease/arctic/flink/read/LogKafkaConsumerThread.class */
public class LogKafkaConsumerThread<T> extends KafkaConsumerThread<T> {
    protected final ClosableBlockingQueue<KafkaTopicPartitionState<T, TopicPartition>> unReSeekPartitionsQueue;

    public LogKafkaConsumerThread(Logger logger, Handover handover, Properties properties, ClosableBlockingQueue<KafkaTopicPartitionState<T, TopicPartition>> closableBlockingQueue, String str, long j, boolean z, MetricGroup metricGroup, MetricGroup metricGroup2) {
        super(logger, handover, properties, closableBlockingQueue, str, j, z, metricGroup, metricGroup2);
        this.unReSeekPartitionsQueue = new ClosableBlockingQueue<>();
    }

    @Override // com.netease.arctic.flink.read.internals.KafkaConsumerThread
    public void reSeekPartitionOffsets() throws Exception {
        KafkaConsumer<byte[], byte[]> kafkaConsumer;
        List<KafkaTopicPartitionState> pollBatch = this.unReSeekPartitionsQueue.pollBatch();
        if (pollBatch == null) {
            return;
        }
        boolean z = false;
        synchronized (this.consumerReassignmentLock) {
            kafkaConsumer = this.consumer;
            this.consumer = null;
        }
        HashMap hashMap = new HashMap();
        try {
            for (TopicPartition topicPartition : kafkaConsumer.assignment()) {
                hashMap.put(topicPartition, Long.valueOf(kafkaConsumer.position(topicPartition)));
            }
            z = true;
            for (KafkaTopicPartitionState kafkaTopicPartitionState : pollBatch) {
                kafkaConsumer.seek((TopicPartition) kafkaTopicPartitionState.getKafkaPartitionHandle(), kafkaTopicPartitionState.getOffset());
            }
            synchronized (this.consumerReassignmentLock) {
                this.consumer = kafkaConsumer;
                if (this.hasBufferedWakeup) {
                    this.consumer.wakeup();
                    this.hasBufferedWakeup = false;
                }
            }
        } catch (WakeupException e) {
            synchronized (this.consumerReassignmentLock) {
                this.consumer = kafkaConsumer;
                if (z) {
                    Iterator it = pollBatch.iterator();
                    while (it.hasNext()) {
                        TopicPartition topicPartition2 = (TopicPartition) ((KafkaTopicPartitionState) it.next()).getKafkaPartitionHandle();
                        this.consumer.seek(topicPartition2, ((Long) hashMap.get(topicPartition2)).longValue());
                    }
                }
                this.hasBufferedWakeup = false;
                throw new KafkaConsumerThread.AbortedReassignmentException();
            }
        }
    }

    public void setTopicPartitionOffset(KafkaTopicPartitionState<T, TopicPartition> kafkaTopicPartitionState) {
        this.unReSeekPartitionsQueue.add(kafkaTopicPartitionState);
    }
}
