package com.alibaba.otter.canal.parse.inbound;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.store.CanalStoreException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.util.Assert;

/* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer.class */
public class EventTransactionBuffer extends AbstractCanalLifeCycle {
    private static final long INIT_SQEUENCE = -1;
    private int indexMask;
    private CanalEntry.Entry[] entries;
    private TransactionFlushCallback flushCallback;
    private int bufferSize = 1024;
    private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE);
    private AtomicLong flushSequence = new AtomicLong(INIT_SQEUENCE);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer$1, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType = new int[CanalEntry.EntryType.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[CanalEntry.EntryType.TRANSACTIONBEGIN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[CanalEntry.EntryType.TRANSACTIONEND.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[CanalEntry.EntryType.ROWDATA.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[CanalEntry.EntryType.HEARTBEAT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer$TransactionFlushCallback.class */
    public interface TransactionFlushCallback {
        void flush(List<CanalEntry.Entry> list) throws InterruptedException;
    }

    public EventTransactionBuffer() {
    }

    public EventTransactionBuffer(TransactionFlushCallback transactionFlushCallback) {
        this.flushCallback = transactionFlushCallback;
    }

    public void start() throws CanalStoreException {
        super.start();
        if (Integer.bitCount(this.bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        Assert.notNull(this.flushCallback, "flush callback is null!");
        this.indexMask = this.bufferSize - 1;
        this.entries = new CanalEntry.Entry[this.bufferSize];
    }

    public void stop() throws CanalStoreException {
        this.putSequence.set(INIT_SQEUENCE);
        this.flushSequence.set(INIT_SQEUENCE);
        this.entries = null;
        super.stop();
    }

    public void add(List<CanalEntry.Entry> list) throws InterruptedException {
        Iterator<CanalEntry.Entry> it = list.iterator();
        while (it.hasNext()) {
            add(it.next());
        }
    }

    public void add(CanalEntry.Entry entry) throws InterruptedException {
        switch (AnonymousClass1.$SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[entry.getEntryType().ordinal()]) {
            case LogEventConvert.version /* 1 */:
                flush();
                put(entry);
                return;
            case 2:
                put(entry);
                flush();
                return;
            case DirectLogFetcher.PACKET_SEQ_OFFSET /* 3 */:
                put(entry);
                CanalEntry.EventType eventType = entry.getHeader().getEventType();
                if (eventType == null || isDml(eventType)) {
                    return;
                }
                flush();
                return;
            case DirectLogFetcher.NET_HEADER_SIZE /* 4 */:
                put(entry);
                flush();
                return;
            default:
                return;
        }
    }

    public void reset() {
        this.putSequence.set(INIT_SQEUENCE);
        this.flushSequence.set(INIT_SQEUENCE);
    }

    private void put(CanalEntry.Entry entry) throws InterruptedException {
        if (!checkFreeSlotAt(this.putSequence.get() + 1)) {
            flush();
            put(entry);
        } else {
            long j = this.putSequence.get() + 1;
            this.entries[getIndex(j)] = entry;
            this.putSequence.set(j);
        }
    }

    private void flush() throws InterruptedException {
        long j = this.flushSequence.get() + 1;
        long j2 = this.putSequence.get();
        if (j > j2) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        long j3 = j;
        while (true) {
            long j4 = j3;
            if (j4 > j2) {
                this.flushCallback.flush(arrayList);
                this.flushSequence.set(j2);
                return;
            } else {
                arrayList.add(this.entries[getIndex(j4)]);
                j3 = j4 + 1;
            }
        }
    }

    private boolean checkFreeSlotAt(long j) {
        return j - ((long) this.bufferSize) <= this.flushSequence.get();
    }

    private int getIndex(long j) {
        return ((int) j) & this.indexMask;
    }

    private boolean isDml(CanalEntry.EventType eventType) {
        return eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE || eventType == CanalEntry.EventType.DELETE;
    }

    public void setBufferSize(int i) {
        this.bufferSize = i;
    }

    public void setFlushCallback(TransactionFlushCallback transactionFlushCallback) {
        this.flushCallback = transactionFlushCallback;
    }
}
