package com.netease.arctic.flink.read.source;

import com.netease.arctic.data.ChangeAction;
import com.netease.arctic.scan.ArcticFileScanTask;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;

/* loaded from: input_file:com/netease/arctic/flink/read/source/ChangeLogDataIterator.class */
public class ChangeLogDataIterator<T> extends DataIterator<T> {
    private final DataIterator<T> insertDataIterator;
    private DataIterator<T> deleteDataIterator;
    private final Function<T, T> arcticMetaColumnRemover;
    private final Function<ChangeActionTrans<T>, T> changeActionTransformer;
    private final QueueHolder<T> insertHolder;
    private final QueueHolder<T> deleteHolder;

    /* loaded from: input_file:com/netease/arctic/flink/read/source/ChangeLogDataIterator$ChangeActionTrans.class */
    public static class ChangeActionTrans<T> {
        protected final T row;
        protected final ChangeAction changeAction;

        private ChangeActionTrans(T t, ChangeAction changeAction) {
            this.row = t;
            this.changeAction = changeAction;
        }

        public static <T> ChangeActionTrans<T> of(T t, ChangeAction changeAction) {
            return new ChangeActionTrans<>(t, changeAction);
        }

        public T row() {
            return this.row;
        }

        public ChangeAction changeAction() {
            return this.changeAction;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netease/arctic/flink/read/source/ChangeLogDataIterator$QueueHolder.class */
    public static class QueueHolder<T> {
        T nextRow;
        ChangeAction changeAction;
        Long nextOffset;

        boolean isEmpty() {
            return this.nextRow == null;
        }

        boolean isNotEmpty() {
            return this.nextRow != null;
        }

        public void put(T t, ChangeAction changeAction, Long l) {
            this.nextRow = t;
            this.changeAction = changeAction;
            this.nextOffset = l;
        }

        public T get() {
            return this.nextRow;
        }

        boolean lesser(QueueHolder<T> queueHolder) {
            return this.nextOffset.compareTo(queueHolder.nextOffset) < 0;
        }

        boolean equalTo(QueueHolder<T> queueHolder) {
            return this.nextOffset.compareTo(queueHolder.nextOffset) == 0;
        }

        void clean() {
            this.nextRow = null;
            this.nextOffset = null;
        }
    }

    public ChangeLogDataIterator(FileScanTaskReader<T> fileScanTaskReader, Collection<ArcticFileScanTask> collection, Collection<ArcticFileScanTask> collection2, Function<T, Long> function, Function<T, T> function2, Function<ChangeActionTrans<T>, T> function3) {
        super(fileScanTaskReader, Collections.emptyList(), function);
        this.deleteDataIterator = DataIterator.empty();
        this.insertHolder = new QueueHolder<>();
        this.deleteHolder = new QueueHolder<>();
        this.insertDataIterator = initDataIterator(fileScanTaskReader, function, collection);
        if (collection2 != null && !collection2.isEmpty()) {
            this.deleteDataIterator = initDataIterator(fileScanTaskReader, function, collection2);
        }
        this.arcticMetaColumnRemover = function2;
        this.changeActionTransformer = function3;
    }

    public void seek(int i, int i2, long j, long j2) {
        this.insertDataIterator.seek(i, j);
        this.deleteDataIterator.seek(i2, j2);
    }

    @Override // com.netease.arctic.flink.read.source.DataIterator
    public void seek(int i, long j) {
        throw new UnsupportedOperationException("This operation is not supported in change log data iterator.");
    }

    private void loadQueueHolder(boolean z) {
        DataIterator<T> dataIterator = z ? this.insertDataIterator : this.deleteDataIterator;
        QueueHolder<T> queueHolder = z ? this.insertHolder : this.deleteHolder;
        if (dataIterator.hasNext() && queueHolder.isEmpty()) {
            queueHolder.put(dataIterator.next(), z ? ChangeAction.INSERT : ChangeAction.DELETE, Long.valueOf(dataIterator.currentArcticFileOffset()));
        }
    }

    @Override // com.netease.arctic.flink.read.source.DataIterator
    public boolean hasNext() {
        loadQueueHolder(false);
        loadQueueHolder(true);
        return this.deleteHolder.isNotEmpty() || this.insertHolder.isNotEmpty();
    }

    @Override // com.netease.arctic.flink.read.source.DataIterator
    public boolean currentFileHasNext() {
        return this.deleteDataIterator.currentFileHasNext() || this.insertDataIterator.currentFileHasNext() || this.deleteHolder.isNotEmpty() || this.insertHolder.isNotEmpty();
    }

    @Override // com.netease.arctic.flink.read.source.DataIterator
    public T next() {
        T apply;
        if (this.deleteHolder.isEmpty() && this.insertHolder.isNotEmpty()) {
            apply = this.changeActionTransformer.apply(ChangeActionTrans.of(this.insertHolder.nextRow, this.insertHolder.changeAction));
            this.insertHolder.clean();
        } else if (this.deleteHolder.isNotEmpty() && this.insertHolder.isEmpty()) {
            apply = this.changeActionTransformer.apply(ChangeActionTrans.of(this.deleteHolder.nextRow, this.deleteHolder.changeAction));
            this.deleteHolder.clean();
        } else if (this.deleteHolder.equalTo(this.insertHolder)) {
            apply = this.changeActionTransformer.apply(ChangeActionTrans.of(this.deleteHolder.nextRow, ChangeAction.UPDATE_BEFORE));
            this.insertHolder.changeAction = ChangeAction.UPDATE_AFTER;
            this.deleteHolder.clean();
        } else if (this.deleteHolder.lesser(this.insertHolder)) {
            apply = this.changeActionTransformer.apply(ChangeActionTrans.of(this.deleteHolder.nextRow, this.deleteHolder.changeAction));
            this.deleteHolder.clean();
        } else {
            apply = this.changeActionTransformer.apply(ChangeActionTrans.of(this.insertHolder.nextRow, this.insertHolder.changeAction));
            this.insertHolder.clean();
        }
        return this.arcticMetaColumnRemover.apply(apply);
    }

    @Override // com.netease.arctic.flink.read.source.DataIterator
    public void close() throws IOException {
        this.insertDataIterator.close();
        this.deleteDataIterator.close();
    }

    public int insertFileOffset() {
        return this.insertDataIterator.fileOffset();
    }

    public long insertRecordOffset() {
        return this.insertDataIterator.recordOffset();
    }

    public int deleteFileOffset() {
        return this.deleteDataIterator.fileOffset();
    }

    public long deleteRecordOffset() {
        return this.deleteDataIterator.recordOffset();
    }

    private DataIterator<T> initDataIterator(FileScanTaskReader<T> fileScanTaskReader, Function<T, Long> function, Collection<ArcticFileScanTask> collection) {
        return new DataIterator<>(fileScanTaskReader, collection, function);
    }
}
