package org.apache.hudi.sink.clustering;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.FlinkWriteClients;

/* loaded from: input_file:org/apache/hudi/sink/clustering/ClusteringPlanOperator.class */
public class ClusteringPlanOperator extends AbstractStreamOperator<ClusteringPlanEvent> implements OneInputStreamOperator<Object, ClusteringPlanEvent> {
    private final Configuration conf;
    private transient HoodieFlinkTable table;

    public ClusteringPlanOperator(Configuration configuration) {
        this.conf = configuration;
    }

    public void open() throws Exception {
        super.open();
        this.table = FlinkTables.createTable(this.conf, getRuntimeContext());
        ClusteringUtil.rollbackClustering(this.table, FlinkWriteClients.createWriteClient(this.conf, getRuntimeContext()));
    }

    public void processElement(StreamRecord<Object> streamRecord) {
    }

    public void notifyCheckpointComplete(long j) {
        try {
            this.table.getMetaClient().reloadActiveTimeline();
            scheduleClustering(this.table, j);
        } catch (Throwable th) {
            LOG.error("Error while scheduling clustering plan for checkpoint: " + j, th);
        }
    }

    private void scheduleClustering(HoodieFlinkTable<?> hoodieFlinkTable, long j) {
        Option fromJavaOptional = Option.fromJavaOptional(ClusteringUtils.getPendingClusteringInstantTimes(hoodieFlinkTable.getMetaClient()).stream().filter(hoodieInstant -> {
            return hoodieInstant.getState() == HoodieInstant.State.REQUESTED;
        }).findFirst());
        if (!fromJavaOptional.isPresent()) {
            LOG.info("No clustering plan for checkpoint " + j);
            return;
        }
        String timestamp = ((HoodieInstant) fromJavaOptional.get()).getTimestamp();
        HoodieInstant replaceCommitRequestedInstant = HoodieTimeline.getReplaceCommitRequestedInstant(timestamp);
        Option clusteringPlan = ClusteringUtils.getClusteringPlan(hoodieFlinkTable.getMetaClient(), replaceCommitRequestedInstant);
        if (!clusteringPlan.isPresent()) {
            LOG.info("No clustering plan scheduled");
            return;
        }
        HoodieClusteringPlan hoodieClusteringPlan = (HoodieClusteringPlan) ((Pair) clusteringPlan.get()).getRight();
        if (hoodieClusteringPlan == null || hoodieClusteringPlan.getInputGroups() == null || hoodieClusteringPlan.getInputGroups().isEmpty()) {
            LOG.info("Empty clustering plan for instant " + timestamp);
            return;
        }
        hoodieFlinkTable.getActiveTimeline().transitionReplaceRequestedToInflight(replaceCommitRequestedInstant, Option.empty());
        hoodieFlinkTable.getMetaClient().reloadActiveTimeline();
        for (HoodieClusteringGroup hoodieClusteringGroup : hoodieClusteringPlan.getInputGroups()) {
            LOG.info("Execute clustering plan for instant {} as {} file slices", timestamp, Integer.valueOf(hoodieClusteringGroup.getSlices().size()));
            this.output.collect(new StreamRecord(new ClusteringPlanEvent(timestamp, ClusteringGroupInfo.create(hoodieClusteringGroup), hoodieClusteringPlan.getStrategy().getStrategyParams())));
        }
    }

    @VisibleForTesting
    public void setOutput(Output<StreamRecord<ClusteringPlanEvent>> output) {
        this.output = output;
    }
}
