package com.ververica.cdc.connectors.mysql.source;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.class */
public class MySqlSourceITCase extends MySqlSourceTestBase {

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(300);
    private final UniqueDatabase customDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase$FailoverPhase.class */
    public enum FailoverPhase {
        SNAPSHOT,
        BINLOG,
        NEVER
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase$FailoverType.class */
    public enum FailoverType {
        TM,
        JM,
        NONE
    }

    @Test
    public void testReadSingleTableWithSingleParallelism() throws Exception {
        testMySqlParallelSource(1, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"customers"});
    }

    @Test
    public void testReadSingleTableWithMultipleParallelism() throws Exception {
        testMySqlParallelSource(4, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"customers"});
    }

    @Test
    public void testReadMultipleTableWithSingleParallelism() throws Exception {
        testMySqlParallelSource(1, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testReadMultipleTableWithMultipleParallelism() throws Exception {
        testMySqlParallelSource(4, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
        testMySqlParallelSource(FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInBinlogPhase() throws Exception {
        testMySqlParallelSource(FailoverType.TM, FailoverPhase.BINLOG, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testJobManagerFailoverInSnapshotPhase() throws Exception {
        testMySqlParallelSource(FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testJobManagerFailoverInBinlogPhase() throws Exception {
        testMySqlParallelSource(FailoverType.JM, FailoverPhase.BINLOG, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverSingleParallelism() throws Exception {
        testMySqlParallelSource(1, FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"customers"});
    }

    @Test
    public void testJobManagerFailoverSingleParallelism() throws Exception {
        testMySqlParallelSource(1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]{"customers"});
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineOnce() throws Exception {
        testNewlyAddedTableOneByOne(1, FailoverType.NONE, FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineOnceWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(1, FailoverType.NONE, FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineTwice() throws Exception {
        testNewlyAddedTableOneByOne(4, FailoverType.NONE, FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineTwiceWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(4, FailoverType.NONE, FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineSingleParallelism() throws Exception {
        testNewlyAddedTableOneByOne(1, FailoverType.NONE, FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineSingleParallelismWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(1, FailoverType.NONE, FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForNewlyAddedTable() throws Exception {
        testNewlyAddedTableOneByOne(4, FailoverType.JM, FailoverPhase.SNAPSHOT, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForNewlyAddedTableWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(4, FailoverType.JM, FailoverPhase.SNAPSHOT, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testTaskManagerFailoverForNewlyAddedTable() throws Exception {
        testNewlyAddedTableOneByOne(1, FailoverType.TM, FailoverPhase.BINLOG, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testTaskManagerFailoverForNewlyAddedTableWithAheadBinlog() throws Exception {
        testNewlyAddedTableOneByOne(1, FailoverType.TM, FailoverPhase.BINLOG, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testConsumingTableWithoutPrimaryKey() {
        try {
            testMySqlParallelSource(1, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"customers_no_pk"});
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, String.format("Incremental snapshot for tables requires primary key, but table %s doesn't have primary key", this.customDatabase.getDatabaseName() + ".customers_no_pk")).isPresent());
        }
    }

    private void testMySqlParallelSource(FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testMySqlParallelSource(4, failoverType, failoverPhase, strArr);
    }

    private void testMySqlParallelSource(int i, FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        this.customDatabase.createAndInitialize();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        String[] strArr2 = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        create.executeSql(String.format("CREATE TABLE customers ( id BIGINT NOT NULL, name STRING, address STRING, phone_number STRING, primary key (id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.chunk.size' = '100', 'server-id' = '%s')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.customDatabase.getUsername(), this.customDatabase.getPassword(), this.customDatabase.getDatabaseName(), getTableNameRegex(strArr), getServerId()));
        TableResult executeSql = create.executeSql("select * from customers");
        CloseableIterator collect = executeSql.collect();
        JobID jobID = ((JobClient) executeSql.getJobClient().get()).getJobID();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < strArr.length; i2++) {
            arrayList.addAll(Arrays.asList(strArr2));
        }
        if (failoverPhase == FailoverPhase.SNAPSHOT && collect.hasNext()) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(100L);
            });
        }
        assertEqualsInAnyOrder(arrayList, fetchRows(collect, arrayList.size()));
        for (String str : strArr) {
            makeFirstPartBinlogEvents(getConnection(), this.customDatabase.getDatabaseName() + '.' + str);
        }
        if (failoverPhase == FailoverPhase.BINLOG) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(200L);
            });
        }
        for (String str2 : strArr) {
            makeSecondPartBinlogEvents(getConnection(), this.customDatabase.getDatabaseName() + '.' + str2);
        }
        String[] strArr3 = {"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        ArrayList arrayList2 = new ArrayList();
        for (int i3 = 0; i3 < strArr.length; i3++) {
            arrayList2.addAll(Arrays.asList(strArr3));
        }
        assertEqualsInAnyOrder(arrayList2, fetchRows(collect, arrayList2.size()));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    private void testNewlyAddedTableOneByOne(int i, FailoverType failoverType, FailoverPhase failoverPhase, boolean z, String... strArr) throws Exception {
        TestValuesTableFactory.clearAllData();
        this.customDatabase.createAndInitialize();
        initialAddressTables(getConnection(), strArr);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String uri = temporaryFolder.newFolder().toURI().toString();
        String str = null;
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < strArr.length; i2++) {
            String[] strArr2 = (String[]) Arrays.asList(strArr).subList(0, i2 + 1).toArray(new String[0]);
            String str2 = strArr[i2];
            if (z) {
                makeBinlogBeforeCaptureForAddressTable(getConnection(), str2);
            }
            StreamTableEnvironment create = StreamTableEnvironment.create(getStreamExecutionEnvironment(str, i));
            create.executeSql(getCreateTableStatement(strArr2));
            create.executeSql("CREATE TABLE sink ( table_name STRING, id BIGINT, country STRING, city STRING, detail_address STRING, primary key (id) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            JobClient jobClient = (JobClient) create.executeSql("insert into sink select * from address").getJobClient().get();
            String str3 = str2.split("_")[1];
            List asList = Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", str2, str3, str3), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", str2, str3, str3), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", str2, str3, str3));
            if (z) {
                asList = Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", str2, str3, str3), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", str2, str3, str3), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", str2, str3, str3), String.format("+I[%s, 417022095255614381, China, %s, %s West Town address 5]", str2, str3, str3));
            }
            if (failoverPhase == FailoverPhase.SNAPSHOT) {
                triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> {
                    sleepMs(100L);
                });
            }
            arrayList.addAll(asList);
            waitForSinkSize("sink", arrayList.size());
            assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
            makeFirstPartBinlogForAddressTable(getConnection(), str2);
            if (failoverPhase == FailoverPhase.BINLOG) {
                triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> {
                    sleepMs(100L);
                });
            }
            makeSecondPartBinlogForAddressTable(getConnection(), str2);
            arrayList.addAll(Arrays.asList(String.format("+U[%s, 416874195632735147, CHINA, %s, %s West Town address 1]", str2, str3, str3), String.format("+I[%s, 417022095255614380, China, %s, %s West Town address 4]", str2, str3, str3)));
            waitForSinkSize("sink", arrayList.size());
            assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
            if (i2 != strArr.length - 1) {
                str = triggerSavepointWithRetry(jobClient, uri);
            }
            jobClient.cancel().get();
        }
    }

    private String getCreateTableStatement(String... strArr) {
        return String.format("CREATE TABLE address ( table_name STRING METADATA VIRTUAL, id BIGINT NOT NULL, country STRING, city STRING, detail_address STRING, primary key (id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.chunk.size' = '2', 'server-id' = '%s', 'scan.newly-added-table.enabled' = 'true')", MYSQL_CONTAINER.getHost(), Integer.valueOf(MYSQL_CONTAINER.getDatabasePort()), this.customDatabase.getUsername(), this.customDatabase.getPassword(), this.customDatabase.getDatabaseName(), getTableNameRegex(strArr), getServerId());
    }

    private StreamExecutionEnvironment getStreamExecutionEnvironment(String str, int i) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        if (str != null) {
            Method declaredMethod = Thread.currentThread().getContextClassLoader().loadClass("org.apache.flink.streaming.api.environment.StreamExecutionEnvironment").getDeclaredMethod("getConfiguration", new Class[0]);
            declaredMethod.setAccessible(true);
            ((Configuration) declaredMethod.invoke(executionEnvironment, new Object[0])).setString(SavepointConfigOptions.SAVEPOINT_PATH, str);
        }
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
        return executionEnvironment;
    }

    private String triggerSavepointWithRetry(JobClient jobClient, String str) throws ExecutionException, InterruptedException {
        for (int i = 0; i < 600; i++) {
            try {
                return (String) jobClient.triggerSavepoint(str).get();
            } catch (Exception e) {
                Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
                if (!findThrowable.isPresent() || !((CheckpointException) findThrowable.get()).getMessage().contains("Checkpoint triggering task")) {
                    throw e;
                }
                Thread.sleep(100L);
            }
        }
        return null;
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    private String getTableNameRegex(String[] strArr) {
        Preconditions.checkState(strArr.length > 0);
        return strArr.length == 1 ? strArr[0] : String.format("(%s)", StringUtils.join(strArr, "|"));
    }

    private String getServerId() {
        int nextInt = new Random().nextInt(100) + 5400;
        return nextInt + "-" + (nextInt + 4);
    }

    private void sleepMs(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private void makeFirstPartBinlogEvents(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            jdbcConnection.execute(new String[]{"UPDATE " + str + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + str + " where id = 102", "INSERT INTO " + str + " VALUES(102, 'user_2','Shanghai','123567891234')", "UPDATE " + str + " SET address = 'Shanghai' where id = 103"});
            jdbcConnection.commit();
        } finally {
            jdbcConnection.close();
        }
    }

    private void makeSecondPartBinlogEvents(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            jdbcConnection.execute(new String[]{"UPDATE " + str + " SET address = 'Hangzhou' where id = 1010"});
            jdbcConnection.commit();
            jdbcConnection.execute(new String[]{"INSERT INTO " + str + " VALUES(2001, 'user_22','Shanghai','123567891234'), (2002, 'user_23','Shanghai','123567891234'),(2003, 'user_24','Shanghai','123567891234')"});
            jdbcConnection.commit();
        } finally {
            jdbcConnection.close();
        }
    }

    private void initialAddressTables(JdbcConnection jdbcConnection, String[] strArr) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            for (String str : strArr) {
                String str2 = this.customDatabase.getDatabaseName() + "." + str;
                String str3 = str.split("_")[1];
                jdbcConnection.execute(new String[]{"CREATE TABLE " + str2 + "(  id BIGINT UNSIGNED NOT NULL PRIMARY KEY,  country VARCHAR(255) NOT NULL,  city VARCHAR(255) NOT NULL,  detail_address VARCHAR(1024));"});
                jdbcConnection.execute(new String[]{String.format("INSERT INTO  %s VALUES (416874195632735147, 'China', '%s', '%s West Town address 1'),       (416927583791428523, 'China', '%s', '%s West Town address 2'),       (417022095255614379, 'China', '%s', '%s West Town address 3');", str2, str3, str3, str3, str3, str3, str3)});
            }
            jdbcConnection.commit();
            jdbcConnection.close();
        } catch (Throwable th) {
            jdbcConnection.close();
            throw th;
        }
    }

    private void makeFirstPartBinlogForAddressTable(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            String str2 = this.customDatabase.getDatabaseName() + "." + str;
            String str3 = str.split("_")[1];
            jdbcConnection.execute(new String[]{String.format("UPDATE %s SET COUNTRY = 'CHINA' where id = 416874195632735147", str2)});
            jdbcConnection.commit();
            jdbcConnection.close();
        } catch (Throwable th) {
            jdbcConnection.close();
            throw th;
        }
    }

    private void makeSecondPartBinlogForAddressTable(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            String str2 = this.customDatabase.getDatabaseName() + "." + str;
            String str3 = str.split("_")[1];
            jdbcConnection.execute(new String[]{String.format("INSERT INTO %s VALUES(417022095255614380, 'China','%s','%s West Town address 4')", str2, str3, str3)});
            jdbcConnection.commit();
            jdbcConnection.close();
        } catch (Throwable th) {
            jdbcConnection.close();
            throw th;
        }
    }

    private void makeBinlogBeforeCaptureForAddressTable(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            String str2 = this.customDatabase.getDatabaseName() + "." + str;
            String str3 = str.split("_")[1];
            jdbcConnection.execute(new String[]{String.format("INSERT INTO %s VALUES(417022095255614381, 'China','%s','%s West Town address 5')", str2, str3, str3)});
            jdbcConnection.commit();
            jdbcConnection.close();
        } catch (Throwable th) {
            jdbcConnection.close();
            throw th;
        }
    }

    private MySqlConnection getConnection() {
        HashMap hashMap = new HashMap();
        hashMap.put("database.hostname", MYSQL_CONTAINER.getHost());
        hashMap.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
        hashMap.put("database.user", this.customDatabase.getUsername());
        hashMap.put("database.password", this.customDatabase.getPassword());
        hashMap.put("database.serverTimezone", ZoneId.of("UTC").toString());
        return DebeziumUtils.createMySqlConnection(io.debezium.config.Configuration.from(hashMap));
    }

    private static void triggerFailover(FailoverType failoverType, JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        switch (failoverType) {
            case TM:
                restartTaskManager(miniCluster, runnable);
                return;
            case JM:
                triggerJobManagerFailover(jobID, miniCluster, runnable);
                return;
            case NONE:
                return;
            default:
                throw new IllegalStateException("Unexpected value: " + failoverType);
        }
    }

    private static void triggerJobManagerFailover(JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl) miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobID).get();
        runnable.run();
        haLeadershipControl.grantJobMasterLeadership(jobID).get();
    }

    private static void restartTaskManager(MiniCluster miniCluster, Runnable runnable) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        runnable.run();
        miniCluster.startTaskManager();
    }

    private static void waitForSinkSize(String str, int i) throws InterruptedException {
        while (sinkSize(str) < i) {
            Thread.sleep(100L);
        }
    }

    private static int sinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getRawResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }
}
