package org.apache.flink.state.changelog;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Iterator;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.util.Preconditions;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/state/changelog/KvStateChangeLoggerImpl.class */
class KvStateChangeLoggerImpl<Key, Value, Ns> extends AbstractStateChangeLogger<Key, Value, Ns> implements KvStateChangeLogger<Value, Ns> {
    private final TypeSerializer<Ns> namespaceSerializer;
    protected final TypeSerializer<Key> keySerializer;
    private final TypeSerializer<Value> valueSerializer;
    private final StateTtlConfig ttlConfig;

    @Nullable
    private final Value defaultValue;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KvStateChangeLoggerImpl(TypeSerializer<Key> typeSerializer, TypeSerializer<Ns> typeSerializer2, TypeSerializer<Value> typeSerializer3, InternalKeyContext<Key> internalKeyContext, StateChangelogWriter<?> stateChangelogWriter, RegisteredStateMetaInfoBase registeredStateMetaInfoBase, StateTtlConfig stateTtlConfig, @Nullable Value value) {
        super(stateChangelogWriter, internalKeyContext, registeredStateMetaInfoBase);
        this.keySerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.valueSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer3);
        this.namespaceSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer2);
        this.ttlConfig = (StateTtlConfig) Preconditions.checkNotNull(stateTtlConfig);
        this.defaultValue = value;
    }

    @Override // org.apache.flink.state.changelog.KvStateChangeLogger
    public void namespacesMerged(Ns ns, Collection<Ns> collection) throws IOException {
        log(StateChangeOperation.MERGE_NS, dataOutputViewStreamWrapper -> {
            this.namespaceSerializer.serialize(ns, dataOutputViewStreamWrapper);
            dataOutputViewStreamWrapper.writeInt(collection.size());
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                this.namespaceSerializer.serialize(it.next(), dataOutputViewStreamWrapper);
            }
        }, ns);
    }

    @Override // org.apache.flink.state.changelog.AbstractStateChangeLogger
    protected void serializeValue(Value value, DataOutputViewStreamWrapper dataOutputViewStreamWrapper) throws IOException {
        this.valueSerializer.serialize(value, dataOutputViewStreamWrapper);
    }

    @Override // org.apache.flink.state.changelog.AbstractStateChangeLogger
    protected void serializeScope(Ns ns, DataOutputViewStreamWrapper dataOutputViewStreamWrapper) throws IOException {
        this.keySerializer.serialize(this.keyContext.getCurrentKey(), dataOutputViewStreamWrapper);
        this.namespaceSerializer.serialize(ns, dataOutputViewStreamWrapper);
    }

    @Override // org.apache.flink.state.changelog.AbstractStateChangeLogger
    protected void writeDefaultValueAndTtl(DataOutputViewStreamWrapper dataOutputViewStreamWrapper) throws IOException {
        dataOutputViewStreamWrapper.writeBoolean(this.ttlConfig.isEnabled());
        if (this.ttlConfig.isEnabled()) {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(dataOutputViewStreamWrapper);
            Throwable th = null;
            try {
                objectOutputStream.writeObject(this.ttlConfig);
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
            } catch (Throwable th3) {
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                throw th3;
            }
        }
        dataOutputViewStreamWrapper.writeBoolean(this.defaultValue != null);
        if (this.defaultValue != null) {
            serializeValue(this.defaultValue, dataOutputViewStreamWrapper);
        }
    }
}
