package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.class */
public class RecordAccumulatorTest {
    private String topic = "test";
    private int partition1 = 0;
    private int partition2 = 1;
    private int partition3 = 2;
    private Node node1 = new Node(0, "localhost", 1111);
    private Node node2 = new Node(1, "localhost", 1112);
    private TopicPartition tp1 = new TopicPartition(this.topic, this.partition1);
    private TopicPartition tp2 = new TopicPartition(this.topic, this.partition2);
    private TopicPartition tp3 = new TopicPartition(this.topic, this.partition3);
    private PartitionInfo part1 = new PartitionInfo(this.topic, this.partition1, this.node1, (Node[]) null, (Node[]) null);
    private PartitionInfo part2 = new PartitionInfo(this.topic, this.partition2, this.node1, (Node[]) null, (Node[]) null);
    private PartitionInfo part3 = new PartitionInfo(this.topic, this.partition3, this.node2, (Node[]) null, (Node[]) null);
    private MockTime time = new MockTime();
    private byte[] key = "key".getBytes();
    private byte[] value = "value".getBytes();
    private int msgSize = DefaultRecord.sizeInBytes(0, 0, this.key.length, this.value.length, Record.EMPTY_HEADERS);
    private Cluster cluster = new Cluster((String) null, Arrays.asList(this.node1, this.node2), Arrays.asList(this.part1, this.part2, this.part3), Collections.emptySet(), Collections.emptySet());
    private Metrics metrics = new Metrics(this.time);
    private final long maxBlockTimeMs = 1000;
    private final LogContext logContext = new LogContext();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/RecordAccumulatorTest$BatchDrainedResult.class */
    public class BatchDrainedResult {
        final int numSplit;
        final int numBatches;

        BatchDrainedResult(int i, int i2) {
            this.numBatches = i2;
            this.numSplit = i;
        }
    }

    @After
    public void teardown() {
        this.metrics.close();
    }

    @Test
    public void testFull() throws Exception {
        long milliseconds = this.time.milliseconds();
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1025 + 61, 10 * 1025, CompressionType.NONE, 10L);
        int expectedNumAppends = expectedNumAppends(1025);
        for (int i = 0; i < expectedNumAppends; i++) {
            createTestRecordAccumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
            Deque deque = (Deque) createTestRecordAccumulator.batches().get(this.tp1);
            Assert.assertEquals(1L, deque.size());
            Assert.assertTrue(((ProducerBatch) deque.peekFirst()).isWritable());
            Assert.assertEquals("No partitions should be ready.", 0L, createTestRecordAccumulator.ready(this.cluster, milliseconds).readyNodes.size());
        }
        createTestRecordAccumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
        Deque deque2 = (Deque) createTestRecordAccumulator.batches().get(this.tp1);
        Assert.assertEquals(2L, deque2.size());
        Assert.assertTrue(((ProducerBatch) deque2.iterator().next()).isWritable());
        Assert.assertEquals("Our partition's leader should be ready", Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes);
        List list = (List) createTestRecordAccumulator.drain(this.cluster, Collections.singleton(this.node1), Integer.MAX_VALUE, 0L).get(Integer.valueOf(this.node1.id()));
        Assert.assertEquals(1L, list.size());
        Iterator it = ((ProducerBatch) list.get(0)).records().records().iterator();
        for (int i2 = 0; i2 < expectedNumAppends; i2++) {
            Record record = (Record) it.next();
            Assert.assertEquals("Keys should match", ByteBuffer.wrap(this.key), record.key());
            Assert.assertEquals("Values should match", ByteBuffer.wrap(this.value), record.value());
        }
        Assert.assertFalse("No more records", it.hasNext());
    }

    @Test
    public void testAppendLargeCompressed() throws Exception {
        testAppendLarge(CompressionType.GZIP);
    }

    @Test
    public void testAppendLargeNonCompressed() throws Exception {
        testAppendLarge(CompressionType.NONE);
    }

    private void testAppendLarge(CompressionType compressionType) throws Exception {
        byte[] bArr = new byte[2 * 512];
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(512 + 61, 10240L, compressionType, 0L);
        createTestRecordAccumulator.append(this.tp1, 0L, this.key, bArr, Record.EMPTY_HEADERS, (Callback) null, 1000L);
        Assert.assertEquals("Our partition's leader should be ready", Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes);
        Deque deque = (Deque) createTestRecordAccumulator.batches().get(this.tp1);
        Assert.assertEquals(1L, deque.size());
        List list = TestUtils.toList(((ProducerBatch) deque.peek()).records().batches());
        Assert.assertEquals(1L, list.size());
        MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) list.get(0);
        Assert.assertEquals(0L, mutableRecordBatch.baseOffset());
        List list2 = TestUtils.toList(mutableRecordBatch);
        Assert.assertEquals(1L, list2.size());
        Record record = (Record) list2.get(0);
        Assert.assertEquals(0L, record.offset());
        Assert.assertEquals(ByteBuffer.wrap(this.key), record.key());
        Assert.assertEquals(ByteBuffer.wrap(bArr), record.value());
        Assert.assertEquals(0L, record.timestamp());
    }

    @Test
    public void testAppendLargeOldMessageFormatCompressed() throws Exception {
        testAppendLargeOldMessageFormat(CompressionType.GZIP);
    }

    @Test
    public void testAppendLargeOldMessageFormatNonCompressed() throws Exception {
        testAppendLargeOldMessageFormat(CompressionType.NONE);
    }

    private void testAppendLargeOldMessageFormat(CompressionType compressionType) throws Exception {
        byte[] bArr = new byte[2 * 512];
        new ApiVersions().update(this.node1.idString(), NodeApiVersions.create(Collections.singleton(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(512 + 61, 10240L, compressionType, 0L);
        createTestRecordAccumulator.append(this.tp1, 0L, this.key, bArr, Record.EMPTY_HEADERS, (Callback) null, 1000L);
        Assert.assertEquals("Our partition's leader should be ready", Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes);
        Deque deque = (Deque) createTestRecordAccumulator.batches().get(this.tp1);
        Assert.assertEquals(1L, deque.size());
        List list = TestUtils.toList(((ProducerBatch) deque.peek()).records().batches());
        Assert.assertEquals(1L, list.size());
        MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) list.get(0);
        Assert.assertEquals(0L, mutableRecordBatch.baseOffset());
        List list2 = TestUtils.toList(mutableRecordBatch);
        Assert.assertEquals(1L, list2.size());
        Record record = (Record) list2.get(0);
        Assert.assertEquals(0L, record.offset());
        Assert.assertEquals(ByteBuffer.wrap(this.key), record.key());
        Assert.assertEquals(ByteBuffer.wrap(bArr), record.value());
        Assert.assertEquals(0L, record.timestamp());
    }

    @Test
    public void testLinger() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1085, 10240L, CompressionType.NONE, 10L);
        createTestRecordAccumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
        Assert.assertEquals("No partitions should be ready", 0L, createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes.size());
        this.time.sleep(10L);
        Assert.assertEquals("Our partition's leader should be ready", Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes);
        List list = (List) createTestRecordAccumulator.drain(this.cluster, Collections.singleton(this.node1), Integer.MAX_VALUE, 0L).get(Integer.valueOf(this.node1.id()));
        Assert.assertEquals(1L, list.size());
        Iterator it = ((ProducerBatch) list.get(0)).records().records().iterator();
        Record record = (Record) it.next();
        Assert.assertEquals("Keys should match", ByteBuffer.wrap(this.key), record.key());
        Assert.assertEquals("Values should match", ByteBuffer.wrap(this.value), record.value());
        Assert.assertFalse("No more records", it.hasNext());
    }

    @Test
    public void testPartialDrain() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1085, 10240L, CompressionType.NONE, 10L);
        int i = (1024 / this.msgSize) + 1;
        for (TopicPartition topicPartition : Arrays.asList(this.tp1, this.tp2)) {
            for (int i2 = 0; i2 < i; i2++) {
                createTestRecordAccumulator.append(topicPartition, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
            }
        }
        Assert.assertEquals("Partition's leader should be ready", Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes);
        Assert.assertEquals("But due to size bound only one partition should have been retrieved", 1L, ((List) createTestRecordAccumulator.drain(this.cluster, Collections.singleton(this.node1), 1024, 0L).get(Integer.valueOf(this.node1.id()))).size());
    }

    @Test
    public void testStressfulSituation() throws Exception {
        final RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1085, 10240L, CompressionType.NONE, 0L);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            arrayList.add(new Thread() { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 10000; i2++) {
                        try {
                            createTestRecordAccumulator.append(new TopicPartition(RecordAccumulatorTest.this.topic, i2 % 2), 0L, RecordAccumulatorTest.this.key, RecordAccumulatorTest.this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).start();
        }
        int i2 = 0;
        long milliseconds = this.time.milliseconds();
        while (i2 < 50000) {
            List<ProducerBatch> list = (List) createTestRecordAccumulator.drain(this.cluster, createTestRecordAccumulator.ready(this.cluster, milliseconds).readyNodes, 5120, 0L).get(Integer.valueOf(this.node1.id()));
            if (list != null) {
                for (ProducerBatch producerBatch : list) {
                    for (Record record : producerBatch.records().records()) {
                        i2++;
                    }
                    createTestRecordAccumulator.deallocate(producerBatch);
                }
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Thread) it2.next()).join();
        }
    }

    @Test
    public void testNextReadyCheckDelay() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1025 + 61, 10 * 1025, CompressionType.NONE, 10L);
        int expectedNumAppends = expectedNumAppends(1025);
        for (int i = 0; i < expectedNumAppends; i++) {
            createTestRecordAccumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
        }
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds());
        Assert.assertEquals("No nodes should be ready.", 0L, ready.readyNodes.size());
        Assert.assertEquals("Next check time should be the linger time", 10L, ready.nextReadyCheckDelayMs);
        this.time.sleep(10 / 2);
        for (int i2 = 0; i2 < expectedNumAppends; i2++) {
            createTestRecordAccumulator.append(this.tp3, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
        }
        RecordAccumulator.ReadyCheckResult ready2 = createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds());
        Assert.assertEquals("No nodes should be ready.", 0L, ready2.readyNodes.size());
        Assert.assertEquals("Next check time should be defined by node1, half remaining linger time", 10 / 2, ready2.nextReadyCheckDelayMs);
        for (int i3 = 0; i3 < expectedNumAppends + 1; i3++) {
            createTestRecordAccumulator.append(this.tp2, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
        }
        RecordAccumulator.ReadyCheckResult ready3 = createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds());
        Assert.assertEquals("Node1 should be ready", Collections.singleton(this.node1), ready3.readyNodes);
        Assert.assertTrue("Next check time should be defined by node2, at most linger time", ready3.nextReadyCheckDelayMs <= 10);
    }

    @Test
    public void testRetryBackoff() throws Exception {
        RecordAccumulator recordAccumulator = new RecordAccumulator(this.logContext, 1085, 10240L, CompressionType.NONE, 2305843009213693951L, 4611686018427387903L, this.metrics, this.time, new ApiVersions(), (TransactionManager) null);
        long milliseconds = this.time.milliseconds();
        recordAccumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
        RecordAccumulator.ReadyCheckResult ready = recordAccumulator.ready(this.cluster, milliseconds + 2305843009213693951L + 1);
        Assert.assertEquals("Node1 should be ready", Collections.singleton(this.node1), ready.readyNodes);
        Map drain = recordAccumulator.drain(this.cluster, ready.readyNodes, Integer.MAX_VALUE, milliseconds + 2305843009213693951L + 1);
        Assert.assertEquals("Node1 should be the only ready node.", 1L, drain.size());
        Assert.assertEquals("Partition 0 should only have one batch drained.", 1L, ((List) drain.get(0)).size());
        long milliseconds2 = this.time.milliseconds();
        recordAccumulator.reenqueue((ProducerBatch) ((List) drain.get(0)).get(0), milliseconds2);
        recordAccumulator.append(this.tp2, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
        RecordAccumulator.ReadyCheckResult ready2 = recordAccumulator.ready(this.cluster, milliseconds2 + 2305843009213693951L + 1);
        Assert.assertEquals("Node1 should be ready", Collections.singleton(this.node1), ready2.readyNodes);
        Map drain2 = recordAccumulator.drain(this.cluster, ready2.readyNodes, Integer.MAX_VALUE, milliseconds2 + 2305843009213693951L + 1);
        Assert.assertEquals("Node1 should be the only ready node.", 1L, drain2.size());
        Assert.assertEquals("Node1 should only have one batch drained.", 1L, ((List) drain2.get(0)).size());
        Assert.assertEquals("Node1 should only have one batch for partition 1.", this.tp2, ((ProducerBatch) ((List) drain2.get(0)).get(0)).topicPartition);
        RecordAccumulator.ReadyCheckResult ready3 = recordAccumulator.ready(this.cluster, milliseconds2 + 4611686018427387903L + 1);
        Assert.assertEquals("Node1 should be ready", Collections.singleton(this.node1), ready3.readyNodes);
        Map drain3 = recordAccumulator.drain(this.cluster, ready3.readyNodes, Integer.MAX_VALUE, milliseconds2 + 4611686018427387903L + 1);
        Assert.assertEquals("Node1 should be the only ready node.", 1L, drain3.size());
        Assert.assertEquals("Node1 should only have one batch drained.", 1L, ((List) drain3.get(0)).size());
        Assert.assertEquals("Node1 should only have one batch for partition 0.", this.tp1, ((ProducerBatch) ((List) drain3.get(0)).get(0)).topicPartition);
    }

    @Test
    public void testFlush() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(4157, 65536L, CompressionType.NONE, Long.MAX_VALUE);
        for (int i = 0; i < 100; i++) {
            createTestRecordAccumulator.append(new TopicPartition(this.topic, i % 3), 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
            Assert.assertTrue(createTestRecordAccumulator.hasIncomplete());
        }
        Assert.assertEquals("No nodes should be ready.", 0L, createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes.size());
        createTestRecordAccumulator.beginFlush();
        Map drain = createTestRecordAccumulator.drain(this.cluster, createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertTrue(createTestRecordAccumulator.hasIncomplete());
        Iterator it = drain.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((List) it.next()).iterator();
            while (it2.hasNext()) {
                createTestRecordAccumulator.deallocate((ProducerBatch) it2.next());
            }
        }
        createTestRecordAccumulator.awaitFlushCompletion();
        Assert.assertFalse(createTestRecordAccumulator.hasUndrained());
        Assert.assertFalse(createTestRecordAccumulator.hasIncomplete());
    }

    private void delayedInterrupt(final Thread thread, final long j) {
        new Thread() { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Time.SYSTEM.sleep(j);
                thread.interrupt();
            }
        }.start();
    }

    @Test
    public void testAwaitFlushComplete() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(4157, 65536L, CompressionType.NONE, Long.MAX_VALUE);
        createTestRecordAccumulator.append(new TopicPartition(this.topic, 0), 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
        createTestRecordAccumulator.beginFlush();
        Assert.assertTrue(createTestRecordAccumulator.flushInProgress());
        delayedInterrupt(Thread.currentThread(), 1000L);
        try {
            createTestRecordAccumulator.awaitFlushCompletion();
            Assert.fail("awaitFlushCompletion should throw InterruptException");
        } catch (InterruptedException e) {
            Assert.assertFalse("flushInProgress count should be decremented even if thread is interrupted", createTestRecordAccumulator.flushInProgress());
        }
    }

    @Test
    public void testAbortIncompleteBatches() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(189, 65536L, CompressionType.NONE, Long.MAX_VALUE);
        for (int i = 0; i < 100; i++) {
            createTestRecordAccumulator.append(new TopicPartition(this.topic, i % 3), 0L, this.key, this.value, (Header[]) null, new Callback() { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.1TestCallback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    Assert.assertTrue(exc.getMessage().equals("Producer is closed forcefully."));
                    atomicInteger.incrementAndGet();
                }
            }, 1000L);
        }
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds());
        Assert.assertFalse(ready.readyNodes.isEmpty());
        Map drain = createTestRecordAccumulator.drain(this.cluster, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertTrue(createTestRecordAccumulator.hasUndrained());
        Assert.assertTrue(createTestRecordAccumulator.hasIncomplete());
        int i2 = 0;
        Iterator it = drain.entrySet().iterator();
        while (it.hasNext()) {
            for (ProducerBatch producerBatch : (List) ((Map.Entry) it.next()).getValue()) {
                Assert.assertTrue(producerBatch.isClosed());
                Assert.assertFalse(producerBatch.produceFuture.completed());
                i2 += producerBatch.recordCount;
            }
        }
        Assert.assertTrue(i2 > 0 && i2 < 100);
        createTestRecordAccumulator.abortIncompleteBatches();
        Assert.assertEquals(100, atomicInteger.get());
        Assert.assertFalse(createTestRecordAccumulator.hasUndrained());
        Assert.assertFalse(createTestRecordAccumulator.hasIncomplete());
    }

    @Test
    public void testAbortUnsentBatches() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(189, 65536L, CompressionType.NONE, Long.MAX_VALUE);
        final KafkaException kafkaException = new KafkaException();
        for (int i = 0; i < 100; i++) {
            createTestRecordAccumulator.append(new TopicPartition(this.topic, i % 3), 0L, this.key, this.value, (Header[]) null, new Callback() { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.2TestCallback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    Assert.assertEquals(kafkaException, exc);
                    atomicInteger.incrementAndGet();
                }
            }, 1000L);
        }
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds());
        Assert.assertFalse(ready.readyNodes.isEmpty());
        Map drain = createTestRecordAccumulator.drain(this.cluster, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertTrue(createTestRecordAccumulator.hasUndrained());
        Assert.assertTrue(createTestRecordAccumulator.hasIncomplete());
        createTestRecordAccumulator.abortUndrainedBatches(kafkaException);
        int i2 = 0;
        Iterator it = drain.entrySet().iterator();
        while (it.hasNext()) {
            for (ProducerBatch producerBatch : (List) ((Map.Entry) it.next()).getValue()) {
                Assert.assertTrue(producerBatch.isClosed());
                Assert.assertFalse(producerBatch.produceFuture.completed());
                i2 += producerBatch.recordCount;
            }
        }
        Assert.assertTrue(i2 > 0);
        Assert.assertTrue(atomicInteger.get() > 0);
        Assert.assertEquals(100, atomicInteger.get() + i2);
        Assert.assertFalse(createTestRecordAccumulator.hasUndrained());
        Assert.assertTrue(createTestRecordAccumulator.hasIncomplete());
    }

    @Test
    public void testExpiredBatches() throws InterruptedException {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1025 + 61, 10 * 1025, CompressionType.NONE, 3000L);
        int expectedNumAppends = expectedNumAppends(1025);
        for (int i = 0; i < expectedNumAppends; i++) {
            createTestRecordAccumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
            Assert.assertEquals("No partitions should be ready.", 0L, createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes.size());
        }
        createTestRecordAccumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 0L);
        Set set = createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes;
        Assert.assertEquals("Our partition's leader should be ready", Collections.singleton(this.node1), set);
        this.time.sleep(60 + 1);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assert.assertEquals("The batch should not be expired when the partition is muted", 0L, createTestRecordAccumulator.expiredBatches(60, this.time.milliseconds()).size());
        createTestRecordAccumulator.unmutePartition(this.tp1, 0L);
        Assert.assertEquals("The batch should be expired", 1L, createTestRecordAccumulator.expiredBatches(60, this.time.milliseconds()).size());
        Assert.assertEquals("No partitions should be ready.", 0L, createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes.size());
        this.time.sleep(3000L);
        Assert.assertEquals("Our partition's leader should be ready", Collections.singleton(this.node1), set);
        this.time.sleep(60 + 1);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assert.assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0L, createTestRecordAccumulator.expiredBatches(60, this.time.milliseconds()).size());
        createTestRecordAccumulator.unmutePartition(this.tp1, 0L);
        Assert.assertEquals("The batch should be expired when the partition is not muted", 1L, createTestRecordAccumulator.expiredBatches(60, this.time.milliseconds()).size());
        Assert.assertEquals("No partitions should be ready.", 0L, createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes.size());
        createTestRecordAccumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 0L);
        this.time.sleep(3000L);
        Set set2 = createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes;
        Assert.assertEquals("Our partition's leader should be ready", Collections.singleton(this.node1), set2);
        Map drain = createTestRecordAccumulator.drain(this.cluster, set2, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertEquals("There should be only one batch.", ((List) drain.get(Integer.valueOf(this.node1.id()))).size(), 1L);
        this.time.sleep(1000L);
        createTestRecordAccumulator.reenqueue((ProducerBatch) ((List) drain.get(Integer.valueOf(this.node1.id()))).get(0), this.time.milliseconds());
        this.time.sleep(60 + 100);
        Assert.assertEquals("The batch should not be expired.", 0L, createTestRecordAccumulator.expiredBatches(60, this.time.milliseconds()).size());
        this.time.sleep(1L);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assert.assertEquals("The batch should not be expired when the partition is muted", 0L, createTestRecordAccumulator.expiredBatches(60, this.time.milliseconds()).size());
        createTestRecordAccumulator.unmutePartition(this.tp1, 0L);
        Assert.assertEquals("The batch should be expired when the partition is not muted.", 1L, createTestRecordAccumulator.expiredBatches(60, this.time.milliseconds()).size());
        createTestRecordAccumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 0L);
        this.time.sleep(3000L);
        Assert.assertEquals("Our partition's leader should be ready", Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes);
        this.time.sleep(60 + 1);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assert.assertEquals("The batch should not be expired when the partition is muted", 0L, createTestRecordAccumulator.expiredBatches(60, this.time.milliseconds()).size());
        createTestRecordAccumulator.unmutePartition(this.tp1, this.time.milliseconds() + 100);
        Assert.assertEquals("The batch should not be expired when the partition is muted", 0L, createTestRecordAccumulator.expiredBatches(60, this.time.milliseconds()).size());
        this.time.sleep(100L);
        Assert.assertEquals("The batch should be expired", 1L, createTestRecordAccumulator.expiredBatches(60, this.time.milliseconds()).size());
        Assert.assertEquals("No partitions should be ready.", 0L, createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes.size());
    }

    @Test
    public void testMutedPartitions() throws InterruptedException {
        long milliseconds = this.time.milliseconds();
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1025 + 61, 10 * 1025, CompressionType.NONE, 10L);
        int expectedNumAppends = expectedNumAppends(1025);
        for (int i = 0; i < expectedNumAppends; i++) {
            createTestRecordAccumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 1000L);
            Assert.assertEquals("No partitions should be ready.", 0L, createTestRecordAccumulator.ready(this.cluster, milliseconds).readyNodes.size());
        }
        this.time.sleep(2000L);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assert.assertEquals("No node should be ready", 0L, createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes.size());
        createTestRecordAccumulator.unmutePartition(this.tp1, 0L);
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds());
        Assert.assertTrue("The batch should be ready", ready.readyNodes.size() > 0);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assert.assertEquals("No batch should have been drained", 0L, ((List) createTestRecordAccumulator.drain(this.cluster, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds()).get(Integer.valueOf(this.node1.id()))).size());
        createTestRecordAccumulator.unmutePartition(this.tp1, 0L);
        Assert.assertTrue("The batch should have been drained.", ((List) createTestRecordAccumulator.drain(this.cluster, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds()).get(Integer.valueOf(this.node1.id()))).size() > 0);
    }

    @Test(expected = UnsupportedVersionException.class)
    public void testIdempotenceWithOldMagic() throws InterruptedException {
        ApiVersions apiVersions = new ApiVersions();
        apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
        new RecordAccumulator(this.logContext, 1025 + 61, 10 * 1025, CompressionType.NONE, 10L, 100L, this.metrics, this.time, apiVersions, new TransactionManager()).append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (Callback) null, 0L);
    }

    @Test
    public void testSplitAndReenqueue() throws ExecutionException, InterruptedException {
        long milliseconds = this.time.milliseconds();
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1024, 10240L, CompressionType.GZIP, 10L);
        ProducerBatch producerBatch = new ProducerBatch(this.tp1, MemoryRecords.builder(ByteBuffer.allocate(4096), CompressionType.NONE, TimestampType.CREATE_TIME, 0L), milliseconds, true);
        byte[] bArr = new byte[1024];
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Callback callback = new Callback() { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.3
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                atomicInteger.incrementAndGet();
            }
        };
        FutureRecordMetadata tryAppend = producerBatch.tryAppend(milliseconds, (byte[]) null, bArr, Record.EMPTY_HEADERS, callback, milliseconds);
        FutureRecordMetadata tryAppend2 = producerBatch.tryAppend(milliseconds, (byte[]) null, bArr, Record.EMPTY_HEADERS, callback, milliseconds);
        Assert.assertNotNull(tryAppend);
        Assert.assertNotNull(tryAppend2);
        producerBatch.close();
        createTestRecordAccumulator.reenqueue(producerBatch, milliseconds);
        this.time.sleep(101L);
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds());
        Assert.assertTrue("The batch should be ready", ready.readyNodes.size() > 0);
        Map drain = createTestRecordAccumulator.drain(this.cluster, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertEquals("Only node1 should be drained", 1L, drain.size());
        Assert.assertEquals("Only one batch should be drained", 1L, ((List) drain.get(Integer.valueOf(this.node1.id()))).size());
        createTestRecordAccumulator.splitAndReenqueue((ProducerBatch) ((List) drain.get(Integer.valueOf(this.node1.id()))).get(0));
        this.time.sleep(101L);
        Map drain2 = createTestRecordAccumulator.drain(this.cluster, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertFalse(drain2.isEmpty());
        Assert.assertFalse(((List) drain2.get(Integer.valueOf(this.node1.id()))).isEmpty());
        ((ProducerBatch) ((List) drain2.get(Integer.valueOf(this.node1.id()))).get(0)).done(atomicInteger.get(), 100L, (RuntimeException) null);
        Assert.assertEquals("The first message should have been acked.", 1L, atomicInteger.get());
        Assert.assertTrue(tryAppend.isDone());
        Assert.assertEquals(0L, ((RecordMetadata) tryAppend.get()).offset());
        Map drain3 = createTestRecordAccumulator.drain(this.cluster, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertFalse(drain3.isEmpty());
        Assert.assertFalse(((List) drain3.get(Integer.valueOf(this.node1.id()))).isEmpty());
        ((ProducerBatch) ((List) drain3.get(Integer.valueOf(this.node1.id()))).get(0)).done(atomicInteger.get(), 100L, (RuntimeException) null);
        Assert.assertEquals("Both message should have been acked.", 2L, atomicInteger.get());
        Assert.assertTrue(tryAppend2.isDone());
        Assert.assertEquals(1L, ((RecordMetadata) tryAppend2.get()).offset());
    }

    @Test
    public void testSplitBatchOffAccumulator() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        CompressionRatioEstimator.setEstimation(this.tp1.topic(), CompressionType.GZIP, 0.1f);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1024, 3072L, CompressionType.GZIP, 0L);
        int prepareSplitBatches = prepareSplitBatches(createTestRecordAccumulator, currentTimeMillis, 100, 20);
        Assert.assertTrue("There should be some split batches", prepareSplitBatches > 0);
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds());
        for (int i = 0; i < prepareSplitBatches; i++) {
            Map drain = createTestRecordAccumulator.drain(this.cluster, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
            Assert.assertFalse(drain.isEmpty());
            Assert.assertFalse(((List) drain.get(Integer.valueOf(this.node1.id()))).isEmpty());
        }
        Assert.assertTrue("All the batches should have been drained.", createTestRecordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes.isEmpty());
        Assert.assertEquals("The split batches should be allocated off the accumulator", 3072L, createTestRecordAccumulator.bufferPoolAvailableMemory());
    }

    @Test
    public void testSplitFrequency() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        Random random = new Random();
        random.setSeed(currentTimeMillis);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1024, 3072L, CompressionType.GZIP, 10L);
        int i = 1;
        while (i < 100) {
            int i2 = 0;
            int i3 = 0;
            CompressionRatioEstimator.resetEstimation(this.topic);
            for (int i4 = 0; i4 < 1000; i4++) {
                createTestRecordAccumulator.append(this.tp1, 0L, (byte[]) null, random.nextInt(100) < i ? bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100), Record.EMPTY_HEADERS, (Callback) null, 0L);
                BatchDrainedResult completeOrSplitBatches = completeOrSplitBatches(createTestRecordAccumulator, 1024);
                i2 += completeOrSplitBatches.numSplit;
                i3 += completeOrSplitBatches.numBatches;
            }
            this.time.sleep(10L);
            BatchDrainedResult completeOrSplitBatches2 = completeOrSplitBatches(createTestRecordAccumulator, 1024);
            int i5 = i2 + completeOrSplitBatches2.numSplit;
            int i6 = i3 + completeOrSplitBatches2.numBatches;
            Assert.assertTrue(String.format("Total num batches = %d, split batches = %d, more than 10%% of the batch splits. Random seed is " + currentTimeMillis, Integer.valueOf(i6), Integer.valueOf(i5)), ((double) i5) / ((double) i6) < 0.10000000149011612d);
            i++;
        }
    }

    private int prepareSplitBatches(RecordAccumulator recordAccumulator, long j, int i, int i2) throws InterruptedException {
        Random random = new Random();
        random.setSeed(j);
        CompressionRatioEstimator.setEstimation(this.tp1.topic(), CompressionType.GZIP, 0.1f);
        for (int i3 = 0; i3 < i2; i3++) {
            recordAccumulator.append(this.tp1, 0L, (byte[]) null, bytesWithPoorCompression(random, i), Record.EMPTY_HEADERS, (Callback) null, 0L);
        }
        RecordAccumulator.ReadyCheckResult ready = recordAccumulator.ready(this.cluster, this.time.milliseconds());
        Assert.assertFalse(ready.readyNodes.isEmpty());
        Map drain = recordAccumulator.drain(this.cluster, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertEquals(1L, drain.size());
        Assert.assertEquals(1L, ((List) drain.values().iterator().next()).size());
        ProducerBatch producerBatch = (ProducerBatch) ((List) drain.values().iterator().next()).get(0);
        int splitAndReenqueue = recordAccumulator.splitAndReenqueue(producerBatch);
        recordAccumulator.deallocate(producerBatch);
        return splitAndReenqueue;
    }

    private BatchDrainedResult completeOrSplitBatches(RecordAccumulator recordAccumulator, int i) {
        boolean z;
        int i2 = 0;
        int i3 = 0;
        do {
            z = false;
            Iterator it = recordAccumulator.drain(this.cluster, recordAccumulator.ready(this.cluster, this.time.milliseconds()).readyNodes, Integer.MAX_VALUE, this.time.milliseconds()).values().iterator();
            while (it.hasNext()) {
                for (ProducerBatch producerBatch : (List) it.next()) {
                    z = true;
                    i3++;
                    if (producerBatch.estimatedSizeInBytes() > i + 61) {
                        recordAccumulator.splitAndReenqueue(producerBatch);
                        i2++;
                    } else {
                        producerBatch.done(0L, 0L, (RuntimeException) null);
                    }
                    recordAccumulator.deallocate(producerBatch);
                }
            }
        } while (z);
        return new BatchDrainedResult(i2, i3);
    }

    private byte[] bytesWithGoodCompression(Random random) {
        byte[] bArr = new byte[100];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        while (wrap.remaining() > 0) {
            wrap.putInt(random.nextInt(1000));
        }
        return bArr;
    }

    private byte[] bytesWithPoorCompression(Random random, int i) {
        byte[] bArr = new byte[i];
        random.nextBytes(bArr);
        return bArr;
    }

    private int expectedNumAppends(int i) {
        int i2 = 0;
        int i3 = 0;
        while (true) {
            int sizeInBytes = DefaultRecord.sizeInBytes(i3, 0L, this.key.length, this.value.length, Record.EMPTY_HEADERS);
            if (i2 + sizeInBytes > i) {
                return i3;
            }
            i3++;
            i2 += sizeInBytes;
        }
    }

    private RecordAccumulator createTestRecordAccumulator(int i, long j, CompressionType compressionType, long j2) {
        return new RecordAccumulator(this.logContext, i, j, compressionType, j2, 100L, this.metrics, this.time, new ApiVersions(), (TransactionManager) null);
    }
}
