package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.util.CollectionUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperatorTest.class */
public class SourceOperatorTest {
    private static final int SUBTASK_INDEX = 1;
    private static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10);
    private MockSourceReader mockSourceReader;
    private MockOperatorEventGateway mockGateway;
    private SourceOperator<Integer, MockSourceSplit> operator;

    @Before
    public void setup() throws Exception {
        this.mockSourceReader = new MockSourceReader();
        this.mockGateway = new MockOperatorEventGateway();
        this.operator = new TestingSourceOperator((SourceReader) this.mockSourceReader, (OperatorEventGateway) this.mockGateway, SUBTASK_INDEX, true);
        Environment testingEnvironment = getTestingEnvironment();
        this.operator.setup(new SourceOperatorStreamTask(testingEnvironment), new MockStreamConfig(new Configuration(), SUBTASK_INDEX), new MockOutput(new ArrayList()));
        this.operator.initializeState(new StreamTaskStateInitializerImpl(testingEnvironment, new MemoryStateBackend()));
    }

    @After
    public void cleanUp() throws Exception {
        this.operator.close();
        this.operator.dispose();
        Assert.assertTrue(this.mockSourceReader.isClosed());
    }

    @Test
    public void testInitializeState() throws Exception {
        StateInitializationContext stateContext = getStateContext();
        this.operator.initializeState(stateContext);
        Assert.assertNotNull(stateContext.getOperatorStateStore().getListState(SourceOperator.SPLITS_STATE_DESC));
    }

    @Test
    public void testOpen() throws Exception {
        this.operator.initializeState(getStateContext());
        this.operator.open();
        Assert.assertEquals(Collections.singletonList(MOCK_SPLIT), this.mockSourceReader.getAssignedSplits());
        Assert.assertTrue(this.mockSourceReader.isStarted());
        Assert.assertEquals(1L, this.mockGateway.getEventsSent().size());
        Assert.assertTrue(((OperatorEvent) this.mockGateway.getEventsSent().get(0)) instanceof ReaderRegistrationEvent);
        Assert.assertEquals(1L, r0.subtaskId());
    }

    @Test
    public void testHandleAddSplitsEvent() throws Exception {
        this.operator.initializeState(getStateContext());
        this.operator.open();
        MockSourceSplit mockSourceSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit), new MockSourceSplitSerializer()));
        Assert.assertEquals(Arrays.asList(MOCK_SPLIT, mockSourceSplit), this.mockSourceReader.getAssignedSplits());
    }

    @Test
    public void testHandleAddSourceEvent() throws Exception {
        this.operator.initializeState(getStateContext());
        this.operator.open();
        SourceEvent sourceEvent = new SourceEvent() { // from class: org.apache.flink.streaming.api.operators.SourceOperatorTest.1
        };
        this.operator.handleOperatorEvent(new SourceEventWrapper(sourceEvent));
        Assert.assertEquals(Collections.singletonList(sourceEvent), this.mockSourceReader.getReceivedSourceEvents());
    }

    @Test
    public void testSnapshotState() throws Exception {
        this.operator.initializeState(getStateContext());
        this.operator.open();
        MockSourceSplit mockSourceSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit), new MockSourceSplitSerializer()));
        this.operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));
        Assert.assertEquals(Arrays.asList(MOCK_SPLIT, mockSourceSplit), CollectionUtil.iterableToList((Iterable) this.operator.getReaderState().get()));
    }

    @Test
    public void testNotifyCheckpointComplete() throws Exception {
        this.operator.initializeState(getStateContext());
        this.operator.open();
        this.operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));
        this.operator.notifyCheckpointComplete(100L);
        Assert.assertEquals(100L, ((Long) this.mockSourceReader.getCompletedCheckpoints().get(0)).longValue());
    }

    @Test
    public void testNotifyCheckpointAborted() throws Exception {
        this.operator.initializeState(getStateContext());
        this.operator.open();
        this.operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));
        this.operator.notifyCheckpointAborted(100L);
        Assert.assertEquals(100L, ((Long) this.mockSourceReader.getAbortedCheckpoints().get(0)).longValue());
    }

    @Test
    public void testDisposeAfterCloseOnlyClosesReaderOnce() throws Exception {
        this.operator.initializeState(getStateContext());
        this.operator.open();
        this.operator.close();
        this.operator.dispose();
        Assert.assertEquals(1L, this.mockSourceReader.getTimesClosed());
    }

    private StateInitializationContext getStateContext() throws Exception {
        byte[] writeVersionAndSerialize = SimpleVersionedSerialization.writeVersionAndSerialize(new MockSourceSplitSerializer(), MOCK_SPLIT);
        StateInitializationContextImpl stateInitializationContextImpl = new StateInitializationContextImpl(false, createOperatorStateStore(), (KeyedStateStore) null, (Iterable) null, (Iterable) null);
        stateInitializationContextImpl.getOperatorStateStore().getListState(SourceOperator.SPLITS_STATE_DESC).update(Collections.singletonList(writeVersionAndSerialize));
        return stateInitializationContextImpl;
    }

    private OperatorStateStore createOperatorStateStore() throws Exception {
        MockEnvironment build = new MockEnvironmentBuilder().build();
        return new MemoryStateBackend().createOperatorStateBackend(build, "test-operator", Collections.emptyList(), new CloseableRegistry());
    }

    private Environment getTestingEnvironment() {
        return new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), SUBTASK_INDEX, new TestTaskStateManager());
    }
}
