package com.alibaba.alink.operator.stream.sink;

import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.plugin.ClassLoaderFactory;
import java.io.IOException;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.InstantiationUtil;

/* loaded from: input_file:com/alibaba/alink/operator/stream/sink/RichSinkFunctionWithClassLoader.class */
public class RichSinkFunctionWithClassLoader extends RichSinkFunction<Row> implements CheckpointedFunction, CheckpointListener {
    private final ClassLoaderFactory factory;
    private final byte[] serializedRichSinkFunction;
    private transient RichSinkFunction<Row> internal;

    public RichSinkFunctionWithClassLoader(ClassLoaderFactory classLoaderFactory, RichSinkFunction<Row> richSinkFunction) {
        this.factory = classLoaderFactory;
        this.internal = richSinkFunction;
        try {
            this.serializedRichSinkFunction = InstantiationUtil.serializeObject(richSinkFunction);
        } catch (IOException e) {
            throw new AkUnclassifiedErrorException(e.getMessage(), e);
        }
    }

    private RichSinkFunction<Row> getRichSinkFunction() {
        if (this.internal == null) {
            try {
                this.internal = (RichSinkFunction) InstantiationUtil.deserializeObject(this.serializedRichSinkFunction, this.factory.create());
            } catch (IOException | ClassNotFoundException e) {
                throw new AkUnclassifiedErrorException(e.getMessage(), e);
            }
        }
        return this.internal;
    }

    public void setRuntimeContext(RuntimeContext runtimeContext) {
        this.factory.doAsThrowRuntime(() -> {
            getRichSinkFunction().setRuntimeContext(runtimeContext);
        });
    }

    public RuntimeContext getRuntimeContext() {
        ClassLoaderFactory classLoaderFactory = this.factory;
        RichSinkFunction<Row> richSinkFunction = getRichSinkFunction();
        richSinkFunction.getClass();
        return (RuntimeContext) classLoaderFactory.doAsThrowRuntime(richSinkFunction::getRuntimeContext);
    }

    public IterationRuntimeContext getIterationRuntimeContext() {
        ClassLoaderFactory classLoaderFactory = this.factory;
        RichSinkFunction<Row> richSinkFunction = getRichSinkFunction();
        richSinkFunction.getClass();
        return (IterationRuntimeContext) classLoaderFactory.doAsThrowRuntime(richSinkFunction::getIterationRuntimeContext);
    }

    public void open(Configuration configuration) throws Exception {
        this.factory.doAsThrowRuntime(() -> {
            getRichSinkFunction().open(configuration);
        });
    }

    public void close() throws Exception {
        ClassLoaderFactory classLoaderFactory = this.factory;
        RichSinkFunction<Row> richSinkFunction = getRichSinkFunction();
        richSinkFunction.getClass();
        classLoaderFactory.doAsThrowRuntime(richSinkFunction::close);
    }

    public void invoke(Row row) throws Exception {
        this.factory.doAsThrowRuntime(() -> {
            getRichSinkFunction().invoke(row);
        });
    }

    public void invoke(Row row, SinkFunction.Context context) throws Exception {
        this.factory.doAsThrowRuntime(() -> {
            getRichSinkFunction().invoke(row, context);
        });
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (!(this.internal instanceof CheckpointListener)) {
            throw new IllegalStateException("Internal is not the CheckpointListener.");
        }
        this.factory.doAsThrowRuntime(() -> {
            getRichSinkFunction().notifyCheckpointComplete(j);
        });
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!(this.internal instanceof CheckpointedFunction)) {
            throw new IllegalStateException("Internal is not the CheckpointedFunction.");
        }
        this.factory.doAsThrowRuntime(() -> {
            getRichSinkFunction().snapshotState(functionSnapshotContext);
        });
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        if (!(getRichSinkFunction() instanceof CheckpointedFunction)) {
            throw new IllegalStateException("Internal is not the CheckpointedFunction.");
        }
        this.factory.doAsThrowRuntime(() -> {
            getRichSinkFunction().initializeState(functionInitializationContext);
        });
    }
}
