package com.alibaba.alink.common.dl.exchange;

import com.alibaba.flink.ml.util.SpscOffHeapQueue;
import java.io.Closeable;
import java.io.IOException;

/* loaded from: input_file:com/alibaba/alink/common/dl/exchange/BytesDataExchange.class */
public class BytesDataExchange implements Closeable {
    private static final int DEFAULT_QUEUE_SIZE = 8388608;
    private final BytesRecordWriter writer;
    private final BytesRecordReader reader;
    private long numReadRecords;
    private long numWriteRecords;
    private final SpscOffHeapQueue readQueue;
    private final SpscOffHeapQueue writeQueue;

    public BytesDataExchange(String str, String str2) throws Exception {
        this(str, str2, DEFAULT_QUEUE_SIZE);
    }

    public BytesDataExchange(String str, String str2, int i) throws Exception {
        this.numReadRecords = 0L;
        this.numWriteRecords = 0L;
        this.readQueue = new SpscOffHeapQueue(str, i);
        this.writeQueue = new SpscOffHeapQueue(str2, i);
        this.writer = new BytesRecordWriter(new SpscOffHeapQueue.QueueOutputStream(this.writeQueue));
        this.reader = new BytesRecordReader(new SpscOffHeapQueue.QueueInputStream(this.readQueue));
    }

    public <T> boolean write(byte[] bArr) throws IOException {
        this.numWriteRecords++;
        return this.writer.write(bArr);
    }

    public byte[] read(boolean z) throws IOException {
        byte[] read = z ? this.reader.read() : this.reader.tryRead();
        if (null == read) {
            return null;
        }
        this.numReadRecords++;
        return read;
    }

    public long getNumReadRecords() {
        return this.numReadRecords;
    }

    public long getNumWriteRecords() {
        return this.numWriteRecords;
    }

    public void markWriteFinished() {
        this.writeQueue.markFinished();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.reader.close();
        this.writer.close();
        this.readQueue.close();
        this.writeQueue.close();
    }
}
