/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.log;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormatVersion;
import org.apache.hudi.common.table.log.LogFileCreationCallback;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieLogFormatWriter
implements HoodieLogFormat.Writer {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFormatWriter.class);
    private HoodieLogFile logFile;
    private FSDataOutputStream output;
    private final HoodieStorage storage;
    private final long sizeThreshold;
    private final Integer bufferSize;
    private final Short replication;
    private final String rolloverLogWriteToken;
    private final LogFileCreationCallback fileCreationHook;
    private boolean closed = false;
    private transient Thread shutdownThread = null;

    public HoodieLogFormatWriter(HoodieStorage storage, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, String rolloverLogWriteToken, LogFileCreationCallback fileCreationHook) {
        this.storage = storage;
        this.logFile = logFile;
        this.sizeThreshold = sizeThreshold;
        this.bufferSize = bufferSize != null ? bufferSize.intValue() : storage.getDefaultBufferSize();
        this.replication = replication != null ? replication.shortValue() : storage.getDefaultReplication(logFile.getPath().getParent());
        this.rolloverLogWriteToken = rolloverLogWriteToken;
        this.fileCreationHook = fileCreationHook;
        this.addShutDownHook();
    }

    @Override
    public HoodieLogFile getLogFile() {
        return this.logFile;
    }

    public long getSizeThreshold() {
        return this.sizeThreshold;
    }

    @VisibleForTesting
    public void withOutputStream(FSDataOutputStream output) {
        this.output = output;
    }

    private FSDataOutputStream getOutputStream() throws IOException {
        if (this.output == null) {
            boolean created = false;
            while (!created) {
                try {
                    this.createNewFile();
                    LOG.info("Created a new log file: {}", (Object)this.logFile);
                    created = true;
                }
                catch (FileAlreadyExistsException ignored) {
                    LOG.info("File {} already exists, rolling over", (Object)this.logFile.getPath());
                    this.rollOver();
                }
                catch (RemoteException re) {
                    if (re.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
                        LOG.warn("Another task executor writing to the same log file({}), rolling over", (Object)this.logFile);
                        this.rollOver();
                        continue;
                    }
                    throw re;
                }
            }
        }
        return this.output;
    }

    @Override
    public AppendResult appendBlock(HoodieLogBlock block) throws IOException, InterruptedException {
        return this.appendBlocks(Collections.singletonList(block));
    }

    @Override
    public AppendResult appendBlocks(List<HoodieLogBlock> blocks) throws IOException {
        HoodieLogFormatVersion currentLogFormatVersion = new HoodieLogFormatVersion(1);
        FSDataOutputStream originalOutputStream = this.getOutputStream();
        long startPos = originalOutputStream.getPos();
        long sizeWritten = 0L;
        FSDataOutputStream outputStream = new FSDataOutputStream((OutputStream)originalOutputStream, new FileSystem.Statistics(this.storage.getScheme()), startPos);
        for (HoodieLogBlock block : blocks) {
            long startSize = outputStream.size();
            outputStream.write(HoodieLogFormat.MAGIC);
            byte[] headerBytes = HoodieLogBlock.getHeaderMetadataBytes(block.getLogBlockHeader());
            byte[] content = block.getContentBytes(this.storage);
            byte[] footerBytes = HoodieLogBlock.getFooterMetadataBytes(block.getLogBlockFooter());
            outputStream.writeLong((long)this.getLogBlockLength(content.length, headerBytes.length, footerBytes.length));
            outputStream.writeInt(currentLogFormatVersion.getVersion());
            outputStream.writeInt(block.getBlockType().ordinal());
            outputStream.write(headerBytes);
            outputStream.writeLong((long)content.length);
            outputStream.write(content);
            outputStream.write(footerBytes);
            outputStream.writeLong((long)outputStream.size() - startSize);
            if (outputStream.size() == Integer.MAX_VALUE) {
                throw new HoodieIOException("Blocks appended may overflow. Please decrease log block size or log block amount");
            }
            sizeWritten += (long)outputStream.size() - startSize;
        }
        this.flush();
        AppendResult result = new AppendResult(this.logFile, startPos, sizeWritten);
        this.rolloverIfNeeded();
        return result;
    }

    private int getLogBlockLength(int contentLength, int headerLength, int footerLength) {
        return 8 + headerLength + 8 + contentLength + footerLength + 8;
    }

    private void rolloverIfNeeded() throws IOException {
        if (this.getCurrentSize() > this.sizeThreshold) {
            LOG.info("CurrentSize {} has reached threshold {}. Rolling over to the next version", (Object)this.getCurrentSize(), (Object)this.sizeThreshold);
            this.rollOver();
        }
    }

    private void rollOver() throws IOException {
        this.closeStream();
        this.logFile = this.logFile.rollOver(this.rolloverLogWriteToken);
        this.closed = false;
    }

    private void createNewFile() throws IOException {
        this.fileCreationHook.preFileCreation(this.logFile);
        this.output = new FSDataOutputStream(this.storage.create(this.logFile.getPath(), false, this.bufferSize, this.replication, 0x20000000L), new FileSystem.Statistics(this.storage.getScheme()));
    }

    @Override
    public void close() throws IOException {
        if (null != this.shutdownThread) {
            Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
        }
        this.closeStream();
    }

    private void closeStream() throws IOException {
        if (this.output != null) {
            this.flush();
            this.output.close();
            this.output = null;
            this.closed = true;
        }
    }

    private void flush() throws IOException {
        if (this.output == null) {
            return;
        }
        this.output.flush();
        this.output.hsync();
    }

    @Override
    public long getCurrentSize() throws IOException {
        if (this.closed) {
            throw new IllegalStateException("Cannot get current size as the underlying stream has been closed already");
        }
        if (this.output == null) {
            return 0L;
        }
        return this.output.getPos();
    }

    private void addShutDownHook() {
        this.shutdownThread = new Thread(){

            @Override
            public void run() {
                try {
                    LOG.warn("running logformatwriter hook");
                    if (HoodieLogFormatWriter.this.output != null) {
                        HoodieLogFormatWriter.this.closeStream();
                    }
                }
                catch (Exception e) {
                    LOG.warn(String.format("unable to close output stream for log file %s", HoodieLogFormatWriter.this.logFile), (Throwable)e);
                }
            }
        };
        Runtime.getRuntime().addShutdownHook(this.shutdownThread);
    }
}

