package org.apache.flink.fs.cos.common.writer;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.fs.cos.common.utils.RefCountedFile;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/fs/cos/common/writer/COSRecoverableWriter.class */
public class COSRecoverableWriter implements RecoverableWriter {
    private static final Logger LOG = LoggerFactory.getLogger(COSRecoverableWriter.class);
    private final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator;
    private final long userDefinedMinPartSize;
    private final COSAccessHelper cosAccessHelper;
    private final COSRecoverableMultipartUploadFactory uploadFactory;

    public COSRecoverableWriter(COSAccessHelper cOSAccessHelper, COSRecoverableMultipartUploadFactory cOSRecoverableMultipartUploadFactory, FunctionWithException<File, RefCountedFile, IOException> functionWithException, long j) {
        this.tempFileCreator = (FunctionWithException) Preconditions.checkNotNull(functionWithException);
        this.userDefinedMinPartSize = j;
        this.cosAccessHelper = (COSAccessHelper) Preconditions.checkNotNull(cOSAccessHelper);
        this.uploadFactory = (COSRecoverableMultipartUploadFactory) Preconditions.checkNotNull(cOSRecoverableMultipartUploadFactory);
    }

    public RecoverableFsDataOutputStream open(Path path) throws IOException {
        return COSRecoverableFsDataOutputStream.newStream(this.uploadFactory.getNewRecoverableUpload(path), this.tempFileCreator, this.userDefinedMinPartSize);
    }

    public RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable resumeRecoverable) throws IOException {
        COSRecoverable castToCOSRecoverable = castToCOSRecoverable(resumeRecoverable);
        return COSRecoverableFsDataOutputStream.recoverStream(this.uploadFactory.recoverRecoverableUpload(castToCOSRecoverable), this.tempFileCreator, this.userDefinedMinPartSize, castToCOSRecoverable.getNumBytesInParts());
    }

    public boolean requiresCleanupOfRecoverableState() {
        return true;
    }

    public boolean cleanupRecoverableState(RecoverableWriter.ResumeRecoverable resumeRecoverable) throws IOException {
        String inCompleteObjectName = castToCOSRecoverable(resumeRecoverable).getInCompleteObjectName();
        return inCompleteObjectName != null && this.cosAccessHelper.deleteObject(inCompleteObjectName);
    }

    public RecoverableFsDataOutputStream.Committer recoverForCommit(RecoverableWriter.CommitRecoverable commitRecoverable) throws IOException {
        return ((COSRecoverableFsDataOutputStream) recover(castToCOSRecoverable(commitRecoverable))).closeForCommit();
    }

    public SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitRecoverableSerializer() {
        return COSRecoverableSerializer.INSTANCE;
    }

    public SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeRecoverableSerializer() {
        return COSRecoverableSerializer.INSTANCE;
    }

    public boolean supportsResume() {
        return true;
    }

    private static COSRecoverable castToCOSRecoverable(RecoverableWriter.CommitRecoverable commitRecoverable) {
        if (commitRecoverable instanceof COSRecoverable) {
            return (COSRecoverable) commitRecoverable;
        }
        throw new IllegalArgumentException("COS File System cannot recover recoverable for other file system: " + commitRecoverable);
    }

    private static void waitForFinish(long j, long j2) {
        long j3 = j + j2;
        long currentTimeMillis = System.currentTimeMillis() / 1000;
        if (currentTimeMillis < j3) {
            try {
                TimeUnit.SECONDS.sleep(j3 - currentTimeMillis);
            } catch (InterruptedException e) {
                LOG.warn("wait for finish occur the interrupt");
            }
        }
    }

    public static COSRecoverableWriter writer(FileSystem fileSystem, FunctionWithException<File, RefCountedFile, IOException> functionWithException, COSAccessHelper cOSAccessHelper, Executor executor, long j, int i, long j2, long j3) {
        Preconditions.checkArgument(j >= 1048576);
        LOG.info("create the cos recoverable writer, init: {}, timeout: {}", Long.valueOf(j2), Long.valueOf(j3));
        waitForFinish(j2, j3);
        return new COSRecoverableWriter(cOSAccessHelper, new COSRecoverableMultipartUploadFactory(fileSystem, cOSAccessHelper, i, executor, functionWithException), functionWithException, j);
    }
}
