package com.netease.arctic.scan;

import com.netease.arctic.data.DataTreeNode;
import com.netease.arctic.table.BaseKeyedTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.TablePropertyUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.util.BinPacking;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/arctic/scan/BaseKeyedTableScan.class */
public class BaseKeyedTableScan implements KeyedTableScan {
    private static final Logger LOG = LoggerFactory.getLogger(BaseKeyedTableScan.class);
    private final BaseKeyedTable table;
    List<NodeFileScanTask> splitTasks = new ArrayList();
    private final Map<StructLike, List<NodeFileScanTask>> fileScanTasks = new HashMap();
    private final int lookBack;
    private final long openFileCost;
    private final long splitSize;
    private Expression expression;

    public BaseKeyedTableScan(BaseKeyedTable baseKeyedTable) {
        this.table = baseKeyedTable;
        this.openFileCost = PropertyUtil.propertyAsLong(baseKeyedTable.properties(), TableProperties.SPLIT_OPEN_FILE_COST, TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
        this.splitSize = PropertyUtil.propertyAsLong(baseKeyedTable.properties(), TableProperties.SPLIT_SIZE, 134217728L);
        this.lookBack = PropertyUtil.propertyAsInt(baseKeyedTable.properties(), TableProperties.SPLIT_LOOKBACK, 10);
    }

    @Override // com.netease.arctic.scan.KeyedTableScan
    public KeyedTableScan filter(Expression expression) {
        if (this.expression == null) {
            this.expression = expression;
        } else {
            this.expression = Expressions.and(expression, this.expression);
        }
        return this;
    }

    @Override // com.netease.arctic.scan.KeyedTableScan
    public CloseableIterable<CombinedScanTask> planTasks() {
        Map<StructLike, Collection<ArcticFileScanTask>> groupFilesByPartition = groupFilesByPartition(this.table.primaryKeySpec().primaryKeyExisted() ? (CloseableIterable) this.table.m43io().doAs(this::planChangeFiles) : CloseableIterable.empty(), (CloseableIterable) this.table.m43io().doAs(this::planBaseFiles));
        LOG.info("planning table {} need plan partition size {}", this.table.id(), Integer.valueOf(groupFilesByPartition.size()));
        groupFilesByPartition.forEach(this::partitionPlan);
        LOG.info("planning table {} partitionPlan end", this.table.id());
        split();
        LOG.info("planning table {} split end", this.table.id());
        return combineNode(CloseableIterable.withNoopClose(this.splitTasks), this.splitSize, this.lookBack, this.openFileCost);
    }

    private CloseableIterable<ArcticFileScanTask> planBaseFiles() {
        return CloseableIterable.transform(planFiles(this.table.baseTable()), BaseArcticFileScanTask::new);
    }

    private CloseableIterable<ArcticFileScanTask> planChangeFiles() {
        StructLikeMap<Long> partitionMaxTransactionId = TablePropertyUtil.getPartitionMaxTransactionId(this.table);
        ChangeTableIncrementalScan fromLegacyTransaction = this.table.changeTable().newChangeScan().fromTransaction(partitionMaxTransactionId).fromLegacyTransaction(TablePropertyUtil.getLegacyPartitionMaxTransactionId(this.table));
        if (this.expression != null) {
            fromLegacyTransaction = fromLegacyTransaction.filter(this.expression);
        }
        return fromLegacyTransaction.planTasks();
    }

    private CloseableIterable<FileScanTask> planFiles(UnkeyedTable unkeyedTable) {
        TableScan newScan = unkeyedTable.newScan();
        if (this.expression != null) {
            newScan = newScan.filter(this.expression);
        }
        return newScan.planFiles();
    }

    private void split() {
        this.fileScanTasks.forEach((structLike, list) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                NodeFileScanTask nodeFileScanTask = (NodeFileScanTask) it.next();
                if (nodeFileScanTask.cost() <= this.splitSize) {
                    this.splitTasks.add(nodeFileScanTask);
                } else if (nodeFileScanTask.dataTasks().size() < 2) {
                    this.splitTasks.add(nodeFileScanTask);
                } else {
                    this.splitTasks.addAll(Lists.newArrayList(splitNode(CloseableIterable.withNoopClose(nodeFileScanTask.dataTasks()), nodeFileScanTask.arcticEquityDeletes(), this.splitSize, this.lookBack, this.openFileCost)));
                }
            }
        });
    }

    public CloseableIterable<NodeFileScanTask> splitNode(CloseableIterable<ArcticFileScanTask> closeableIterable, List<ArcticFileScanTask> list, long j, int i, long j2) {
        return CloseableIterable.transform(CloseableIterable.combine(new BinPacking.PackingIterable(closeableIterable, j, i, arcticFileScanTask -> {
            return Long.valueOf(Math.max(arcticFileScanTask.mo37file().fileSizeInBytes(), j2));
        }, true), closeableIterable), list2 -> {
            return packingTask(list2, list);
        });
    }

    private NodeFileScanTask packingTask(List<ArcticFileScanTask> list, List<ArcticFileScanTask> list2) {
        return new NodeFileScanTask((List<ArcticFileScanTask>) Stream.concat(list.stream(), list2.stream()).collect(Collectors.toList()));
    }

    public CloseableIterable<CombinedScanTask> combineNode(CloseableIterable<NodeFileScanTask> closeableIterable, long j, int i, long j2) {
        return CloseableIterable.transform(CloseableIterable.combine(new BinPacking.PackingIterable(closeableIterable, j, i, nodeFileScanTask -> {
            return Long.valueOf(Math.max(nodeFileScanTask.cost(), j2));
        }, true), closeableIterable), BaseCombinedScanTask::new);
    }

    private void partitionPlan(StructLike structLike, Collection<ArcticFileScanTask> collection) {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        collection.forEach(arcticFileScanTask -> {
            if (hashSet.contains(arcticFileScanTask.mo37file().path().toString())) {
                return;
            }
            hashSet.add(arcticFileScanTask.mo37file().path().toString());
            DataTreeNode node = arcticFileScanTask.mo37file().node();
            NodeFileScanTask nodeFileScanTask = (NodeFileScanTask) hashMap.getOrDefault(node, new NodeFileScanTask(node));
            nodeFileScanTask.addFile(arcticFileScanTask);
            hashMap.put(node, nodeFileScanTask);
        });
        hashMap.forEach((dataTreeNode, nodeFileScanTask) -> {
            if (nodeFileScanTask.isDataNode().booleanValue()) {
                hashMap.forEach((dataTreeNode, nodeFileScanTask) -> {
                    if (dataTreeNode.equals(dataTreeNode)) {
                        return;
                    }
                    if (dataTreeNode.isSonOf(dataTreeNode) || dataTreeNode.isSonOf(dataTreeNode)) {
                        nodeFileScanTask.addTasks((List) nodeFileScanTask.arcticEquityDeletes().stream().filter(arcticFileScanTask2 -> {
                            return arcticFileScanTask2.mo37file().node().equals(dataTreeNode);
                        }).collect(Collectors.toList()));
                    }
                });
            }
        });
        ArrayList arrayList = new ArrayList();
        hashMap.forEach((dataTreeNode2, nodeFileScanTask2) -> {
            if (nodeFileScanTask2.isDataNode().booleanValue()) {
                arrayList.add(nodeFileScanTask2);
            }
        });
        this.fileScanTasks.put(structLike, arrayList);
    }

    public Map<StructLike, Collection<ArcticFileScanTask>> groupFilesByPartition(CloseableIterable<ArcticFileScanTask> closeableIterable, CloseableIterable<ArcticFileScanTask> closeableIterable2) {
        ListMultimap newListMultimap = Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
        try {
            closeableIterable.forEach(arcticFileScanTask -> {
                newListMultimap.put(arcticFileScanTask.mo37file().partition(), arcticFileScanTask);
            });
            closeableIterable2.forEach(arcticFileScanTask2 -> {
                newListMultimap.put(arcticFileScanTask2.mo37file().partition(), arcticFileScanTask2);
            });
            return newListMultimap.asMap();
        } finally {
            try {
                closeableIterable.close();
                closeableIterable2.close();
            } catch (IOException e) {
                LOG.warn("Failed to close table scan of {} ", this.table.id(), e);
            }
        }
    }
}
