package com.alibaba.alink.operator.stream.source;

import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.plugin.ClassLoaderFactory;
import java.io.IOException;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.InstantiationUtil;

/* loaded from: input_file:com/alibaba/alink/operator/stream/source/RichParallelSourceFunctionWithClassLoader.class */
public class RichParallelSourceFunctionWithClassLoader extends RichParallelSourceFunction<Row> implements CheckpointListener, ResultTypeQueryable<Row>, CheckpointedFunction {
    private final ClassLoaderFactory factory;
    private final byte[] serializedRichParallelSourceFunction;
    private transient RichParallelSourceFunction<Row> internal;

    public RichParallelSourceFunctionWithClassLoader(ClassLoaderFactory classLoaderFactory, RichParallelSourceFunction<Row> richParallelSourceFunction) {
        this.factory = classLoaderFactory;
        this.internal = richParallelSourceFunction;
        try {
            this.serializedRichParallelSourceFunction = InstantiationUtil.serializeObject(richParallelSourceFunction);
        } catch (IOException e) {
            throw new AkUnclassifiedErrorException(e.getMessage(), e);
        }
    }

    private RichParallelSourceFunction<Row> getRichParallelSourceFunction() {
        if (this.internal == null) {
            try {
                this.internal = (RichParallelSourceFunction) InstantiationUtil.deserializeObject(this.serializedRichParallelSourceFunction, this.factory.create());
            } catch (IOException | ClassNotFoundException e) {
                throw new AkUnclassifiedErrorException(e.getMessage(), e);
            }
        }
        return this.internal;
    }

    public void setRuntimeContext(RuntimeContext runtimeContext) {
        this.factory.doAsThrowRuntime(() -> {
            getRichParallelSourceFunction().setRuntimeContext(runtimeContext);
        });
    }

    public RuntimeContext getRuntimeContext() {
        ClassLoaderFactory classLoaderFactory = this.factory;
        RichParallelSourceFunction<Row> richParallelSourceFunction = getRichParallelSourceFunction();
        richParallelSourceFunction.getClass();
        return (RuntimeContext) classLoaderFactory.doAsThrowRuntime(richParallelSourceFunction::getRuntimeContext);
    }

    public IterationRuntimeContext getIterationRuntimeContext() {
        ClassLoaderFactory classLoaderFactory = this.factory;
        RichParallelSourceFunction<Row> richParallelSourceFunction = getRichParallelSourceFunction();
        richParallelSourceFunction.getClass();
        return (IterationRuntimeContext) classLoaderFactory.doAsThrowRuntime(richParallelSourceFunction::getIterationRuntimeContext);
    }

    public void open(Configuration configuration) throws Exception {
        this.factory.doAsThrowRuntime(() -> {
            getRichParallelSourceFunction().open(configuration);
        });
    }

    public void close() throws Exception {
        ClassLoaderFactory classLoaderFactory = this.factory;
        RichParallelSourceFunction<Row> richParallelSourceFunction = getRichParallelSourceFunction();
        richParallelSourceFunction.getClass();
        classLoaderFactory.doAsThrowRuntime(richParallelSourceFunction::close);
    }

    public TypeInformation<Row> getProducedType() {
        if (!(this.internal instanceof ResultTypeQueryable)) {
            throw new IllegalStateException("Internal is not the ResultTypeQueryable.");
        }
        ClassLoaderFactory classLoaderFactory = this.factory;
        ResultTypeQueryable richParallelSourceFunction = getRichParallelSourceFunction();
        richParallelSourceFunction.getClass();
        return (TypeInformation) classLoaderFactory.doAsThrowRuntime(richParallelSourceFunction::getProducedType);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (!(this.internal instanceof CheckpointListener)) {
            throw new IllegalStateException("Internal is not the CheckpointListener.");
        }
        ((CheckpointListener) this.factory.doAsThrowRuntime(() -> {
            return getRichParallelSourceFunction();
        })).notifyCheckpointComplete(j);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!(this.internal instanceof CheckpointedFunction)) {
            throw new IllegalStateException("Internal is not the CheckpointedFunction.");
        }
        ((CheckpointedFunction) this.factory.doAsThrowRuntime(() -> {
            return getRichParallelSourceFunction();
        })).snapshotState(functionSnapshotContext);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        if (!(this.internal instanceof CheckpointedFunction)) {
            throw new IllegalStateException("Internal is not the CheckpointedFunction.");
        }
        ((CheckpointedFunction) this.factory.doAsThrowRuntime(() -> {
            return getRichParallelSourceFunction();
        })).initializeState(functionInitializationContext);
    }

    public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exception {
        this.factory.doAsThrowRuntime(() -> {
            getRichParallelSourceFunction().run(sourceContext);
        });
    }

    public void cancel() {
        ClassLoaderFactory classLoaderFactory = this.factory;
        RichParallelSourceFunction<Row> richParallelSourceFunction = getRichParallelSourceFunction();
        richParallelSourceFunction.getClass();
        classLoaderFactory.doAsThrowRuntime(richParallelSourceFunction::cancel);
    }
}
