package io.debezium.pipeline.signal;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.data.Envelope;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.Signal;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.Assertions;
import org.junit.Test;

/* loaded from: input_file:io/debezium/pipeline/signal/SignalTest.class */
public class SignalTest {
    @Test
    public void shouldDetectSignal() {
        Signal signal = new Signal(config());
        Assertions.assertThat(signal.isSignal(new TableId("dbo", (String) null, "mytable"))).isFalse();
        Assertions.assertThat(signal.isSignal(new TableId("debezium", (String) null, "signal"))).isTrue();
    }

    @Test
    public void shouldExecuteLog() throws Exception {
        Signal signal = new Signal(config());
        LogInterceptor logInterceptor = new LogInterceptor(Log.class);
        Assertions.assertThat(signal.process("log1", "log", "{\"message\": \"signallog {}\"}")).isTrue();
        Assertions.assertThat(logInterceptor.containsMessage("signallog <none>")).isTrue();
    }

    @Test
    public void shouldIgnoreInvalidSignalType() throws Exception {
        Assertions.assertThat(new Signal(config()).process("log1", "log1", "{\"message\": \"signallog\"}")).isFalse();
    }

    @Test
    public void shouldIgnoreUnparseableData() throws Exception {
        Assertions.assertThat(new Signal(config()).process("log1", "log", "{\"message: \"signallog\"}")).isFalse();
    }

    @Test
    public void shouldRegisterAdditionalAction() throws Exception {
        Signal signal = new Signal(config());
        final AtomicInteger atomicInteger = new AtomicInteger();
        signal.registerSignalAction("custom", new Signal.Action() { // from class: io.debezium.pipeline.signal.SignalTest.1
            public boolean arrived(Signal.Payload payload) {
                atomicInteger.set(payload.data.getInteger("v").intValue());
                return true;
            }
        });
        Assertions.assertThat(signal.process("log1", "custom", "{\"v\": 5}")).isTrue();
        Assertions.assertThat(atomicInteger.intValue()).isEqualTo(5);
    }

    @Test
    public void shouldExecuteFromEnvelope() throws Exception {
        Signal signal = new Signal(config());
        Schema build = SchemaBuilder.struct().name("signal").field("col1", Schema.OPTIONAL_STRING_SCHEMA).field("col2", Schema.OPTIONAL_STRING_SCHEMA).field("col3", Schema.OPTIONAL_STRING_SCHEMA).build();
        Envelope build2 = Envelope.defineSchema().withName("someName").withRecord(build).withSource(SchemaBuilder.struct().name("source").build()).build();
        Struct struct = new Struct(build);
        struct.put("col1", "log1");
        struct.put("col2", "custom");
        struct.put("col3", "{\"v\": 5}");
        final AtomicInteger atomicInteger = new AtomicInteger();
        signal.registerSignalAction("custom", new Signal.Action() { // from class: io.debezium.pipeline.signal.SignalTest.2
            public boolean arrived(Signal.Payload payload) {
                atomicInteger.set(payload.data.getInteger("v").intValue());
                return true;
            }
        });
        Assertions.assertThat(signal.process(build2.create(struct, (Struct) null, (Instant) null), (OffsetContext) null)).isTrue();
        Assertions.assertThat(atomicInteger.intValue()).isEqualTo(5);
    }

    @Test
    public void shouldIgnoreInvalidEnvelope() throws Exception {
        Signal signal = new Signal(config());
        Schema build = SchemaBuilder.struct().name("signal").field("col1", Schema.OPTIONAL_STRING_SCHEMA).field("col2", Schema.OPTIONAL_STRING_SCHEMA).build();
        Envelope build2 = Envelope.defineSchema().withName("someName").withRecord(build).withSource(SchemaBuilder.struct().name("source").build()).build();
        Struct struct = new Struct(build);
        struct.put("col1", "log1");
        struct.put("col2", "custom");
        final AtomicInteger atomicInteger = new AtomicInteger();
        signal.registerSignalAction("custom", new Signal.Action() { // from class: io.debezium.pipeline.signal.SignalTest.3
            public boolean arrived(Signal.Payload payload) {
                atomicInteger.set(payload.data.getInteger("v").intValue());
                return true;
            }
        });
        Assertions.assertThat(signal.process(build2.create(struct, (Struct) null, (Instant) null), (OffsetContext) null)).isFalse();
        Assertions.assertThat(atomicInteger.intValue()).isEqualTo(0);
        Assertions.assertThat(signal.process(struct, (OffsetContext) null)).isFalse();
        Assertions.assertThat(atomicInteger.intValue()).isEqualTo(0);
    }

    protected CommonConnectorConfig config() {
        return new CommonConnectorConfig(Configuration.create().with(CommonConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal").build(), "core", 0) { // from class: io.debezium.pipeline.signal.SignalTest.4
            protected SourceInfoStructMaker<?> getSourceInfoStructMaker(CommonConnectorConfig.Version version) {
                return null;
            }

            public String getContextName() {
                return null;
            }

            public String getConnectorName() {
                return null;
            }
        };
    }
}
