package com.netease.arctic.flink.table;

import com.netease.arctic.flink.util.ArcticUtils;
import com.netease.arctic.flink.write.FlinkSink;
import com.netease.arctic.table.ArcticTable;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/flink/table/ArcticDynamicSink.class */
public class ArcticDynamicSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
    public static final Logger LOG = LoggerFactory.getLogger(ArcticDynamicSink.class);
    private final ArcticTableLoader tableLoader;
    private final CatalogTable flinkTable;
    private final String topic;
    private final boolean primaryKeyExisted;
    private boolean overwrite = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ArcticDynamicSink(CatalogTable catalogTable, ArcticTableLoader arcticTableLoader, String str, boolean z) {
        this.tableLoader = arcticTableLoader;
        this.flinkTable = catalogTable;
        this.topic = str;
        this.primaryKeyExisted = z;
    }

    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        ChangelogMode.Builder addContainedKind = ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT);
        if (this.primaryKeyExisted) {
            addContainedKind.addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE);
        }
        return addContainedKind.build();
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        ArcticTable loadArcticTable = ArcticUtils.loadArcticTable(this.tableLoader);
        Properties kafkaProperties = KafkaOptions.getKafkaProperties(loadArcticTable.properties());
        return dataStream -> {
            DataStreamSink<?> build = FlinkSink.forRowData(dataStream).table(loadArcticTable).flinkSchema(this.flinkTable.getSchema()).producerConfig(kafkaProperties).topic(this.topic).tableLoader(this.tableLoader).overwrite(this.overwrite).build();
            UserGroupInformation.reset();
            LOG.info("ugi reset");
            return build;
        };
    }

    public DynamicTableSink copy() {
        return this;
    }

    public String asSummaryString() {
        return "arctic";
    }

    public void applyStaticPartition(Map<String, String> map) {
    }

    public void applyOverwrite(boolean z) {
        this.overwrite = z;
    }
}
