package org.apache.flink.connector.jdbc.internal;

import java.io.Flushable;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.class */
public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>> implements Flushable, AutoCloseable, Serializable {
    protected final JdbcConnectionProvider connectionProvider;
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(JdbcOutputFormat.class);
    private final JdbcExecutionOptions executionOptions;
    private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
    private transient JdbcOutputSerializer<In> serializer;
    private transient JdbcExec jdbcStatementExecutor;
    private transient int batchCount = 0;
    private volatile transient boolean closed = false;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient Exception flushException;

    /* loaded from: input_file:org/apache/flink/connector/jdbc/internal/JdbcOutputFormat$StatementExecutorFactory.class */
    public interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>> extends SerializableSupplier<T> {
    }

    protected Function<In, JdbcIn> getExtractor() {
        return obj -> {
            return obj;
        };
    }

    public JdbcOutputFormat(@Nonnull JdbcConnectionProvider jdbcConnectionProvider, @Nonnull JdbcExecutionOptions jdbcExecutionOptions, @Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory) {
        this.connectionProvider = (JdbcConnectionProvider) Preconditions.checkNotNull(jdbcConnectionProvider);
        this.executionOptions = (JdbcExecutionOptions) Preconditions.checkNotNull(jdbcExecutionOptions);
        this.statementExecutorFactory = (StatementExecutorFactory) Preconditions.checkNotNull(statementExecutorFactory);
    }

    public void open(@Nonnull JdbcOutputSerializer<In> jdbcOutputSerializer) throws IOException {
        this.serializer = (JdbcOutputSerializer) Preconditions.checkNotNull(jdbcOutputSerializer, "Serializer must be defined");
        try {
            this.connectionProvider.getOrEstablishConnection();
            this.jdbcStatementExecutor = createAndOpenStatementExecutor(this.statementExecutorFactory);
            if (this.executionOptions.getBatchIntervalMs() == 0 || this.executionOptions.getBatchSize() == 1) {
                return;
            }
            this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
                synchronized (this) {
                    if (!this.closed) {
                        try {
                            flush();
                        } catch (Exception e) {
                            this.flushException = e;
                        }
                    }
                }
            }, this.executionOptions.getBatchIntervalMs(), this.executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new IOException("unable to open JDBC writer", e);
        }
    }

    private JdbcExec createAndOpenStatementExecutor(StatementExecutorFactory<JdbcExec> statementExecutorFactory) throws IOException {
        JdbcExec jdbcexec = (JdbcExec) statementExecutorFactory.get();
        try {
            jdbcexec.prepareStatements(this.connectionProvider.getConnection());
            return jdbcexec;
        } catch (SQLException e) {
            throw new IOException("unable to open JDBC writer", e);
        }
    }

    public void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to JDBC failed.", this.flushException);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public final synchronized void writeRecord(In in) throws IOException {
        checkFlushException();
        try {
            addToBatch(in, getExtractor().apply(copyIfNecessary(in)));
            this.batchCount++;
            if (this.executionOptions.getBatchSize() > 0 && this.batchCount >= this.executionOptions.getBatchSize()) {
                flush();
            }
        } catch (Exception e) {
            throw new IOException("Writing records to JDBC failed.", e);
        }
    }

    private In copyIfNecessary(In in) {
        return this.serializer.serialize(in);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addToBatch(In in, JdbcIn jdbcin) throws SQLException {
        this.jdbcStatementExecutor.addToBatch(jdbcin);
    }

    @Override // java.io.Flushable
    public synchronized void flush() throws IOException {
        checkFlushException();
        for (int i = 0; i <= this.executionOptions.getMaxRetries(); i++) {
            try {
                attemptFlush();
                this.batchCount = 0;
                return;
            } catch (SQLException e) {
                LOG.error("JDBC executeBatch error, retry times = {}", Integer.valueOf(i), e);
                if (i >= this.executionOptions.getMaxRetries()) {
                    throw new IOException(e);
                }
                try {
                    if (!this.connectionProvider.isConnectionValid()) {
                        updateExecutor(true);
                    }
                    try {
                        Thread.sleep(1000 * i);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        throw new IOException("unable to flush; interrupted while doing another attempt", e);
                    }
                } catch (Exception e3) {
                    LOG.error("JDBC connection is not valid, and reestablish connection failed.", e3);
                    throw new IOException("Reestablish JDBC connection failed", e3);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void attemptFlush() throws SQLException {
        this.jdbcStatementExecutor.executeBatch();
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (this.batchCount > 0) {
                try {
                    flush();
                } catch (Exception e) {
                    LOG.warn("Writing records to JDBC failed.", e);
                    throw new RuntimeException("Writing records to JDBC failed.", e);
                }
            }
            try {
                if (this.jdbcStatementExecutor != null) {
                    this.jdbcStatementExecutor.closeStatements();
                }
            } catch (SQLException e2) {
                LOG.warn("Close JDBC writer failed.", e2);
            }
        }
        this.connectionProvider.closeConnection();
        checkFlushException();
    }

    public void updateExecutor(boolean z) throws SQLException, ClassNotFoundException {
        this.jdbcStatementExecutor.closeStatements();
        this.jdbcStatementExecutor.prepareStatements(z ? this.connectionProvider.reestablishConnection() : this.connectionProvider.getConnection());
    }

    public JdbcExecutionOptions getExecutionOptions() {
        return this.executionOptions;
    }

    @VisibleForTesting
    public Connection getConnection() {
        return this.connectionProvider.getConnection();
    }
}
