package com.alibaba.alink.operator.stream.sink;

import com.alibaba.alink.common.MTable;
import com.alibaba.alink.common.exceptions.AkIllegalOperatorParameterException;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.exceptions.AkUnsupportedOperationException;
import com.alibaba.alink.common.io.filesystem.AkStream;
import com.alibaba.alink.common.io.filesystem.AkUtils;
import com.alibaba.alink.common.io.filesystem.BaseFileSystem;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.copy.FileOutputFormat;
import com.alibaba.alink.operator.common.modelstream.ModelStreamUtils;
import java.io.IOException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.Row;

/* loaded from: input_file:com/alibaba/alink/operator/stream/sink/Export2FileOutputFormat.class */
public class Export2FileOutputFormat extends FileOutputFormat<Row> {
    public static final String IN_PROGRESS_FILE_SUFFIX = ".inprogress";
    private final FilePath filePath;
    private final FileSystem.WriteMode writeMode;
    private final int timeColIndex;
    private final int mTableColIndex;
    private final List<Tuple2<String, SimpleDateFormat>> dataFormats;

    /* renamed from: com.alibaba.alink.operator.stream.sink.Export2FileOutputFormat$1, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/alink/operator/stream/sink/Export2FileOutputFormat$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$core$fs$FileSystem$WriteMode = new int[FileSystem.WriteMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$core$fs$FileSystem$WriteMode[FileSystem.WriteMode.NO_OVERWRITE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$core$fs$FileSystem$WriteMode[FileSystem.WriteMode.OVERWRITE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public Export2FileOutputFormat(FilePath filePath, FileSystem.WriteMode writeMode, List<Tuple2<String, SimpleDateFormat>> list, int i, int i2) {
        super(filePath.getPath(), filePath.getFileSystem());
        this.filePath = filePath;
        this.writeMode = writeMode;
        this.timeColIndex = i;
        this.mTableColIndex = i2;
        this.dataFormats = list;
        setWriteMode(writeMode);
        if (filePath.getFileSystem().isDistributedFS()) {
            return;
        }
        try {
            if (filePath.getFileSystem().exists(filePath.getPath())) {
                switch (AnonymousClass1.$SwitchMap$org$apache$flink$core$fs$FileSystem$WriteMode[writeMode.ordinal()]) {
                    case 1:
                        throw new AkIllegalOperatorParameterException("File or directory already exists. Existing files and directories are not overwritten in " + FileSystem.WriteMode.NO_OVERWRITE.name() + " mode. Use " + FileSystem.WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
                    case 2:
                        filePath.getFileSystem().delete(filePath.getPath(), true);
                        break;
                    default:
                        throw new AkUnsupportedOperationException("Invalid write mode: " + writeMode);
                }
            }
        } catch (IOException e) {
            throw new AkUnclassifiedErrorException("Error. ", e);
        }
    }

    @Override // com.alibaba.alink.common.io.filesystem.copy.FileOutputFormat
    public void open(int i, int i2) throws IOException {
        Path path = this.outputFilePath;
        if (path == null) {
            throw new IOException("The file path is null.");
        }
        BaseFileSystem<?> fileSystem = this.filePath.getFileSystem();
        if (fileSystem.isDistributedFS()) {
            if (!fileSystem.initOutPathLocalFS(path, this.writeMode, true)) {
                throw new IOException("Output directory '" + path + "' could not be created. Canceling task...");
            }
        } else if (!fileSystem.initOutPathLocalFS(path, this.writeMode, true)) {
            throw new IOException("Output path '" + path + "' could not be initialized. Canceling task...");
        }
    }

    public void writeRecord(Row row) throws IOException {
        Timestamp timestamp = (Timestamp) row.getField(this.timeColIndex);
        MTable mTable = (MTable) row.getField(this.mTableColIndex);
        String stringPresentation = ModelStreamUtils.toStringPresentation(timestamp);
        FilePath filePath = this.filePath;
        if (this.dataFormats != null) {
            Path path = filePath.getPath();
            for (Tuple2<String, SimpleDateFormat> tuple2 : this.dataFormats) {
                path = new Path(path, String.format("%s=%s", tuple2.f0, ((SimpleDateFormat) tuple2.f1).format((Date) timestamp)));
            }
            filePath = new FilePath(path, filePath.getFileSystem());
        }
        FilePath filePath2 = new FilePath(new Path(filePath.getPath(), String.format("%s%s", stringPresentation, IN_PROGRESS_FILE_SUFFIX)), filePath.getFileSystem());
        AkStream.AkWriter.AkCollector collector = new AkStream(filePath2, new AkUtils.AkMeta(mTable.getSchemaStr())).getWriter().getCollector();
        Iterator<Row> it = mTable.getRows().iterator();
        while (it.hasNext()) {
            collector.collect(it.next());
        }
        collector.close();
        filePath.getFileSystem().rename(filePath2.getPath(), new FilePath(new Path(filePath.getPath(), stringPresentation), filePath.getFileSystem()).getPath());
    }
}
