package org.apache.flink.contrib.streaming.state.snapshot;

import java.util.LinkedHashMap;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
import org.apache.flink.runtime.state.FullSnapshotResources;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.function.SupplierWithException;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.class */
public class RocksFullSnapshotStrategy<K> extends RocksDBSnapshotStrategyBase<K, FullSnapshotResources<K>> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksFullSnapshotStrategy.class);
    private static final String DESCRIPTION = "Asynchronous full RocksDB snapshot";

    @Nonnull
    private final StreamCompressionDecorator keyGroupCompressionDecorator;
    private final LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;

    public RocksFullSnapshotStrategy(@Nonnull RocksDB rocksDB, @Nonnull ResourceGuard resourceGuard, @Nonnull TypeSerializer<K> typeSerializer, @Nonnull LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> linkedHashMap, @Nonnull LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> linkedHashMap2, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int i, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull StreamCompressionDecorator streamCompressionDecorator) {
        super(DESCRIPTION, rocksDB, resourceGuard, typeSerializer, linkedHashMap, keyGroupRange, i, localRecoveryConfig);
        this.keyGroupCompressionDecorator = streamCompressionDecorator;
        this.registeredPQStates = linkedHashMap2;
    }

    /* renamed from: syncPrepareResources, reason: merged with bridge method [inline-methods] */
    public FullSnapshotResources<K> m44syncPrepareResources(long j) throws Exception {
        return RocksDBFullSnapshotResources.create(this.kvStateInformation, this.registeredPQStates, this.db, this.rocksDBResourceGuard, this.keyGroupRange, this.keySerializer, this.keyGroupPrefixBytes, this.keyGroupCompressionDecorator);
    }

    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(FullSnapshotResources<K> fullSnapshotResources, long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        if (!fullSnapshotResources.getMetaInfoSnapshots().isEmpty()) {
            return new FullSnapshotAsyncWriter(checkpointOptions.getCheckpointType(), createCheckpointStreamSupplier(j, checkpointStreamFactory, checkpointOptions), fullSnapshotResources);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", Long.valueOf(j2));
        }
        return closeableRegistry -> {
            return SnapshotResult.empty();
        };
    }

    public void notifyCheckpointComplete(long j) {
    }

    public void notifyCheckpointAborted(long j) {
    }

    private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(long j, CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) {
        return (!this.localRecoveryConfig.isLocalRecoveryEnabled() || checkpointOptions.getCheckpointType().isSavepoint()) ? () -> {
            return CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
        } : () -> {
            return CheckpointStreamWithResultProvider.createDuplicatingStream(j, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory, this.localRecoveryConfig.getLocalStateDirectoryProvider());
        };
    }
}
