package org.apache.flink.streaming.api.operators.co;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.class */
public class KeyedCoProcessOperatorTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest$AppendCurrentKeyProcessFunction.class */
    private static class AppendCurrentKeyProcessFunction extends KeyedCoProcessFunction<String, Integer, String, String> {
        private AppendCurrentKeyProcessFunction() {
        }

        public void processElement1(Integer num, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect(num + "," + ((String) context.getCurrentKey()));
        }

        public void processElement2(String str, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect(str + "," + ((String) context.getCurrentKey()));
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest$BothTriggeringProcessFunction.class */
    private static class BothTriggeringProcessFunction extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1;

        private BothTriggeringProcessFunction() {
        }

        public void processElement1(Integer num, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            context.timerService().registerProcessingTimeTimer(3L);
            context.timerService().registerEventTimeTimer(6L);
            context.timerService().deleteProcessingTimeTimer(3L);
        }

        public void processElement2(String str, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            context.timerService().registerEventTimeTimer(4L);
            context.timerService().registerProcessingTimeTimer(5L);
            context.timerService().deleteEventTimeTimer(4L);
        }

        public void onTimer(long j, KeyedCoProcessFunction<String, Integer, String, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
            if (TimeDomain.EVENT_TIME.equals(onTimerContext.timeDomain())) {
                collector.collect("EVENT:1777");
            } else {
                collector.collect("PROC:1777");
            }
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest$EventTimeTriggeringProcessFunction.class */
    private static class EventTimeTriggeringProcessFunction extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1;

        private EventTimeTriggeringProcessFunction() {
        }

        public void processElement1(Integer num, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect("INPUT1:" + num);
            context.timerService().registerEventTimeTimer(5L);
        }

        public void processElement2(String str, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect("INPUT2:" + str);
            context.timerService().registerEventTimeTimer(6L);
        }

        public void onTimer(long j, KeyedCoProcessFunction<String, Integer, String, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
            Assert.assertEquals(TimeDomain.EVENT_TIME, onTimerContext.timeDomain());
            collector.collect(((String) onTimerContext.getCurrentKey()) + ":1777");
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest$EventTimeTriggeringStatefulProcessFunction.class */
    private static class EventTimeTriggeringStatefulProcessFunction extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1;
        private final ValueStateDescriptor<String> state;

        private EventTimeTriggeringStatefulProcessFunction() {
            this.state = new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
        }

        public void processElement1(Integer num, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            handleValue(num, collector, context.timerService(), 1);
        }

        public void processElement2(String str, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            handleValue(str, collector, context.timerService(), 2);
        }

        private void handleValue(Object obj, Collector<String> collector, TimerService timerService, int i) throws IOException {
            ValueState state = getRuntimeContext().getState(this.state);
            if (state.value() != null) {
                state.clear();
                timerService.deleteEventTimeTimer(timerService.currentWatermark() + 4);
            } else {
                collector.collect("INPUT" + i + ":" + obj);
                state.update(String.valueOf(obj));
                timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
            }
        }

        public void onTimer(long j, KeyedCoProcessFunction<String, Integer, String, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
            Assert.assertEquals(TimeDomain.EVENT_TIME, onTimerContext.timeDomain());
            collector.collect("STATE:" + ((String) getRuntimeContext().getState(this.state).value()));
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest$IdentityKeySelector.class */
    private static class IdentityKeySelector<T> implements KeySelector<T, T> {
        private static final long serialVersionUID = 1;

        private IdentityKeySelector() {
        }

        public T getKey(T t) throws Exception {
            return t;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest$IntToStringKeySelector.class */
    private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
        private static final long serialVersionUID = 1;

        private IntToStringKeySelector() {
        }

        public String getKey(Integer num) throws Exception {
            return "" + num;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest$ProcessingTimeQueryingProcessFunction.class */
    private static class ProcessingTimeQueryingProcessFunction extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1;

        private ProcessingTimeQueryingProcessFunction() {
        }

        public void processElement1(Integer num, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect(num + "PT:" + context.timerService().currentProcessingTime() + " TS:" + context.timestamp());
        }

        public void processElement2(String str, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect(str + "PT:" + context.timerService().currentProcessingTime() + " TS:" + context.timestamp());
        }

        public void onTimer(long j, KeyedCoProcessFunction<String, Integer, String, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest$ProcessingTimeTriggeringProcessFunction.class */
    private static class ProcessingTimeTriggeringProcessFunction extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1;

        private ProcessingTimeTriggeringProcessFunction() {
        }

        public void processElement1(Integer num, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect("INPUT1:" + num);
            context.timerService().registerProcessingTimeTimer(5L);
        }

        public void processElement2(String str, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect("INPUT2:" + str);
            context.timerService().registerProcessingTimeTimer(6L);
        }

        public void onTimer(long j, KeyedCoProcessFunction<String, Integer, String, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
            Assert.assertEquals(TimeDomain.PROCESSING_TIME, onTimerContext.timeDomain());
            collector.collect("1777");
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest$ProcessingTimeTriggeringStatefulProcessFunction.class */
    private static class ProcessingTimeTriggeringStatefulProcessFunction extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1;
        private final ValueStateDescriptor<String> state;

        private ProcessingTimeTriggeringStatefulProcessFunction() {
            this.state = new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
        }

        public void processElement1(Integer num, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            handleValue(num, collector, context.timerService(), 1);
        }

        public void processElement2(String str, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            handleValue(str, collector, context.timerService(), 2);
        }

        private void handleValue(Object obj, Collector<String> collector, TimerService timerService, int i) throws IOException {
            ValueState state = getRuntimeContext().getState(this.state);
            if (state.value() != null) {
                state.clear();
                timerService.deleteProcessingTimeTimer(timerService.currentProcessingTime() + 4);
            } else {
                collector.collect("INPUT" + i + ":" + obj);
                state.update(String.valueOf(obj));
                timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5);
            }
        }

        public void onTimer(long j, KeyedCoProcessFunction<String, Integer, String, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
            Assert.assertEquals(TimeDomain.PROCESSING_TIME, onTimerContext.timeDomain());
            collector.collect("STATE:" + ((String) getRuntimeContext().getState(this.state).value()));
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest$WatermarkQueryingProcessFunction.class */
    private static class WatermarkQueryingProcessFunction extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1;

        private WatermarkQueryingProcessFunction() {
        }

        public void processElement1(Integer num, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect(num + "WM:" + context.timerService().currentWatermark() + " TS:" + context.timestamp());
        }

        public void processElement2(String str, KeyedCoProcessFunction<String, Integer, String, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect(str + "WM:" + context.timerService().currentWatermark() + " TS:" + context.timestamp());
        }

        public void onTimer(long j, KeyedCoProcessFunction<String, Integer, String, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (KeyedCoProcessFunction<String, Integer, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    @Test
    public void testTimestampAndWatermarkQuerying() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator(new WatermarkQueryingProcessFunction()), new IntToStringKeySelector(), new IdentityKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedTwoInputStreamOperatorTestHarness.setup();
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(17L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(17L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(5, 12L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(42L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(42L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord("6", 13L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(17L));
        concurrentLinkedQueue.add(new StreamRecord("5WM:17 TS:12", 12L));
        concurrentLinkedQueue.add(new Watermark(42L));
        concurrentLinkedQueue.add(new StreamRecord("6WM:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testTimestampAndProcessingTimeQuerying() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator(new ProcessingTimeQueryingProcessFunction()), new IntToStringKeySelector(), new IdentityKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedTwoInputStreamOperatorTestHarness.setup();
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(17L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(5));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(42L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord("6"));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("5PT:17 TS:null"));
        concurrentLinkedQueue.add(new StreamRecord("6PT:42 TS:null"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testEventTimeTimers() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator(new EventTimeTriggeringProcessFunction()), new IntToStringKeySelector(), new IdentityKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedTwoInputStreamOperatorTestHarness.setup();
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(17, 42L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord("18", 42L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(5L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(5L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(6L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(6L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("INPUT1:17", 42L));
        concurrentLinkedQueue.add(new StreamRecord("INPUT2:18", 42L));
        concurrentLinkedQueue.add(new StreamRecord("17:1777", 5L));
        concurrentLinkedQueue.add(new Watermark(5L));
        concurrentLinkedQueue.add(new StreamRecord("18:1777", 6L));
        concurrentLinkedQueue.add(new Watermark(6L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testProcessingTimeTimers() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator(new ProcessingTimeTriggeringProcessFunction()), new IntToStringKeySelector(), new IdentityKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedTwoInputStreamOperatorTestHarness.setup();
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(17));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord("18"));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(5L);
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(6L);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("INPUT1:17"));
        concurrentLinkedQueue.add(new StreamRecord("INPUT2:18"));
        concurrentLinkedQueue.add(new StreamRecord("1777"));
        concurrentLinkedQueue.add(new StreamRecord("1777"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testEventTimeTimerWithState() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator(new EventTimeTriggeringStatefulProcessFunction()), new IntToStringKeySelector(), new IdentityKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedTwoInputStreamOperatorTestHarness.setup();
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(1L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(1L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(17, 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(13, 0L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(2L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(2L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(13, 1L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord("42", 1L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(6L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(6L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(7L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(7L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(1L));
        concurrentLinkedQueue.add(new StreamRecord("INPUT1:17", 0L));
        concurrentLinkedQueue.add(new StreamRecord("INPUT1:13", 0L));
        concurrentLinkedQueue.add(new Watermark(2L));
        concurrentLinkedQueue.add(new StreamRecord("INPUT2:42", 1L));
        concurrentLinkedQueue.add(new StreamRecord("STATE:17", 6L));
        concurrentLinkedQueue.add(new Watermark(6L));
        concurrentLinkedQueue.add(new StreamRecord("STATE:42", 7L));
        concurrentLinkedQueue.add(new Watermark(7L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testProcessingTimeTimerWithState() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator(new ProcessingTimeTriggeringStatefulProcessFunction()), new IntToStringKeySelector(), new IdentityKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedTwoInputStreamOperatorTestHarness.setup();
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(17));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(13));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(13));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord("42"));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(6L);
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(7L);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("INPUT1:17"));
        concurrentLinkedQueue.add(new StreamRecord("INPUT1:13"));
        concurrentLinkedQueue.add(new StreamRecord("INPUT2:42"));
        concurrentLinkedQueue.add(new StreamRecord("STATE:17"));
        concurrentLinkedQueue.add(new StreamRecord("STATE:42"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator(new BothTriggeringProcessFunction()), new IntToStringKeySelector(), new IdentityKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedTwoInputStreamOperatorTestHarness.setup();
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(5, 12L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord("5", 12L));
        OperatorSubtaskState snapshot = keyedTwoInputStreamOperatorTestHarness.snapshot(0L, 0L);
        keyedTwoInputStreamOperatorTestHarness.close();
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness2 = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator(new BothTriggeringProcessFunction()), new IntToStringKeySelector(), new IdentityKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedTwoInputStreamOperatorTestHarness2.setup();
        keyedTwoInputStreamOperatorTestHarness2.initializeState(snapshot);
        keyedTwoInputStreamOperatorTestHarness2.open();
        keyedTwoInputStreamOperatorTestHarness2.setProcessingTime(5L);
        keyedTwoInputStreamOperatorTestHarness2.processWatermark1(new Watermark(6L));
        keyedTwoInputStreamOperatorTestHarness2.processWatermark2(new Watermark(6L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("PROC:1777"));
        concurrentLinkedQueue.add(new StreamRecord("EVENT:1777", 6L));
        concurrentLinkedQueue.add(new Watermark(6L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness2.getOutput());
        keyedTwoInputStreamOperatorTestHarness2.close();
    }

    @Test
    public void testGetCurrentKeyFromContext() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator(new AppendCurrentKeyProcessFunction()), new IntToStringKeySelector(), new IdentityKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedTwoInputStreamOperatorTestHarness.setup();
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(5));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(6));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord("hello"));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord("world"));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("5,5"));
        concurrentLinkedQueue.add(new StreamRecord("6,6"));
        concurrentLinkedQueue.add(new StreamRecord("hello,hello"));
        concurrentLinkedQueue.add(new StreamRecord("world,world"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }
}
