/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.io.asyncfs;

import com.google.protobuf.MessageLite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.io.asyncfs.SendBufSizePredictor;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class FanOutOneBlockAsyncDFSOutput
implements AsyncFSOutput {
    private static final int MAX_DATA_LEN = 0xC00000;
    private final Configuration conf;
    private final FSUtils fsUtils;
    private final DistributedFileSystem dfs;
    private final DFSClient client;
    private final ClientProtocol namenode;
    private final String clientName;
    private final String src;
    private final long fileId;
    private final ExtendedBlock block;
    private final DatanodeInfo[] locations;
    private final Encryptor encryptor;
    private final List<Channel> datanodeList;
    private final DataChecksum summer;
    private final int maxDataLen;
    private final ByteBufAllocator alloc;
    private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque();
    private volatile long ackedBlockLength = 0L;
    private long nextPacketOffsetInBlock = 0L;
    private int trailingPartialChunkLength = 0;
    private long nextPacketSeqno = 0L;
    private ByteBuf buf;
    private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
    private volatile State state;

    private void completed(Channel channel) {
        Iterator<Callback> iter = this.waitingAckQueue.iterator();
        while (iter.hasNext()) {
            Callback c = iter.next();
            if (!c.unfinishedReplicas.remove(channel.id())) continue;
            if (c.unfinishedReplicas.isEmpty()) {
                iter.remove();
                this.ackedBlockLength = c.ackedLength;
                if (c.future.complete(c.ackedLength)) {
                    Callback maybeDummyCb;
                    while (iter.hasNext() && (maybeDummyCb = iter.next()).ackedLength == c.ackedLength) {
                        iter.remove();
                        maybeDummyCb.future.complete(c.ackedLength);
                    }
                }
            }
            return;
        }
    }

    private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
        Callback c;
        if (this.state == State.BROKEN || this.state == State.CLOSED) {
            return;
        }
        if (!(this.state != State.CLOSING || (c = this.waitingAckQueue.peekFirst()) != null && c.unfinishedReplicas.contains(channel.id()))) {
            return;
        }
        this.state = State.BROKEN;
        Throwable error = errorSupplier.get();
        Iterator<Callback> iter = this.waitingAckQueue.iterator();
        block0: while (iter.hasNext()) {
            Callback c2 = iter.next();
            if (!c2.unfinishedReplicas.contains(channel.id())) continue;
            while (true) {
                c2.future.completeExceptionally(error);
                if (!iter.hasNext()) break block0;
                c2 = iter.next();
            }
        }
        this.datanodeList.forEach(ch -> ch.close());
    }

    private void setupReceiver(int timeoutMs) {
        AckHandler ackHandler = new AckHandler(timeoutMs);
        for (Channel ch : this.datanodeList) {
            ch.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler((long)timeoutMs, (long)(timeoutMs / 2), 0L, TimeUnit.MILLISECONDS), new ProtobufVarint32FrameDecoder(), new ProtobufDecoder((MessageLite)DataTransferProtos.PipelineAckProto.getDefaultInstance()), ackHandler});
            ch.config().setAutoRead(true);
        }
    }

    FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) {
        this.conf = conf;
        this.fsUtils = fsUtils;
        this.dfs = dfs;
        this.client = client;
        this.namenode = namenode;
        this.fileId = fileId;
        this.clientName = clientName;
        this.src = src;
        this.block = locatedBlock.getBlock();
        this.locations = locatedBlock.getLocations();
        this.encryptor = encryptor;
        this.datanodeList = datanodeList;
        this.summer = summer;
        this.maxDataLen = 0xC00000 - 0xC00000 % summer.getBytesPerChecksum();
        this.alloc = alloc;
        this.buf = alloc.directBuffer(this.sendBufSizePRedictor.initialSize());
        this.state = State.STREAMING;
        this.setupReceiver(conf.getInt("dfs.client.socket-timeout", 60000));
    }

    @Override
    public void writeInt(int i) {
        this.buf.ensureWritable(4);
        this.buf.writeInt(i);
    }

    @Override
    public void write(ByteBuffer bb) {
        this.buf.ensureWritable(bb.remaining());
        this.buf.writeBytes(bb);
    }

    @Override
    public void write(byte[] b) {
        this.write(b, 0, b.length);
    }

    @Override
    public void write(byte[] b, int off, int len) {
        this.buf.ensureWritable(len);
        this.buf.writeBytes(b, off, len);
    }

    @Override
    public int buffered() {
        return this.buf.readableBytes();
    }

    @Override
    public DatanodeInfo[] getPipeline() {
        return this.locations;
    }

    private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf, long nextPacketOffsetInBlock, boolean syncBlock) {
        int dataLen = dataBuf.readableBytes();
        int chunkLen = this.summer.getBytesPerChecksum();
        int trailingPartialChunkLen = dataLen % chunkLen;
        int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
        int checksumLen = numChecks * this.summer.getChecksumSize();
        ByteBuf checksumBuf = this.alloc.directBuffer(checksumLen);
        this.summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
        checksumBuf.writerIndex(checksumLen);
        PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, this.nextPacketSeqno, false, dataLen, syncBlock);
        int headerLen = header.getSerializedSize();
        ByteBuf headerBuf = this.alloc.buffer(headerLen);
        header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
        headerBuf.writerIndex(headerLen);
        Callback c = new Callback(future, nextPacketOffsetInBlock + (long)dataLen, this.datanodeList);
        this.waitingAckQueue.addLast(c);
        if (this.state != State.STREAMING && this.waitingAckQueue.peekFirst() == c) {
            future.completeExceptionally(new IOException("stream already broken"));
            this.waitingAckQueue.removeFirst();
            return;
        }
        this.datanodeList.forEach(ch -> {
            ch.write((Object)headerBuf.retainedDuplicate());
            ch.write((Object)checksumBuf.retainedDuplicate());
            ch.writeAndFlush((Object)dataBuf.retainedDuplicate());
        });
        checksumBuf.release();
        headerBuf.release();
        dataBuf.release();
        ++this.nextPacketSeqno;
    }

    private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
        int dataLen;
        block14: {
            if (this.state != State.STREAMING) {
                future.completeExceptionally(new IOException("stream already broken"));
                return;
            }
            dataLen = this.buf.readableBytes();
            if (dataLen == this.trailingPartialChunkLength) {
                long lengthAfterFlush = this.nextPacketOffsetInBlock + (long)dataLen;
                Callback lastFlush = this.waitingAckQueue.peekLast();
                if (lastFlush != null) {
                    Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
                    this.waitingAckQueue.addLast(c);
                    if (this.waitingAckQueue.peekFirst() == c) {
                        if (this.state != State.STREAMING) {
                            future.completeExceptionally(new IOException("stream already broken"));
                        } else {
                            future.complete(lengthAfterFlush);
                        }
                        this.waitingAckQueue.removeFirst();
                    }
                } else {
                    future.complete(lengthAfterFlush);
                }
                return;
            }
            if (this.encryptor != null) {
                ByteBuf encryptBuf = this.alloc.directBuffer(dataLen);
                this.buf.readBytes(encryptBuf, this.trailingPartialChunkLength);
                int toEncryptLength = dataLen - this.trailingPartialChunkLength;
                try {
                    this.encryptor.encrypt(this.buf.nioBuffer(this.trailingPartialChunkLength, toEncryptLength), encryptBuf.nioBuffer(this.trailingPartialChunkLength, toEncryptLength));
                }
                catch (IOException e) {
                    encryptBuf.release();
                    future.completeExceptionally(e);
                    return;
                }
                encryptBuf.writerIndex(dataLen);
                this.buf.release();
                this.buf = encryptBuf;
            }
            if (dataLen > this.maxDataLen) {
                long nextSubPacketOffsetInBlock = this.nextPacketOffsetInBlock;
                int remaining = dataLen;
                while (true) {
                    if (remaining < this.maxDataLen) {
                        this.flushBuffer(future, this.buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock, syncBlock);
                        break block14;
                    }
                    this.flushBuffer(new CompletableFuture<Long>(), this.buf.readRetainedSlice(this.maxDataLen), nextSubPacketOffsetInBlock, syncBlock);
                    remaining -= this.maxDataLen;
                    nextSubPacketOffsetInBlock += (long)this.maxDataLen;
                }
            }
            this.flushBuffer(future, this.buf.retain(), this.nextPacketOffsetInBlock, syncBlock);
        }
        this.trailingPartialChunkLength = dataLen % this.summer.getBytesPerChecksum();
        ByteBuf newBuf = this.alloc.directBuffer(this.sendBufSizePRedictor.guess(dataLen)).ensureWritable(this.trailingPartialChunkLength);
        if (this.trailingPartialChunkLength != 0) {
            this.buf.readerIndex(dataLen - this.trailingPartialChunkLength).readBytes(newBuf, this.trailingPartialChunkLength);
        }
        this.buf.release();
        this.buf = newBuf;
        this.nextPacketOffsetInBlock += (long)(dataLen - this.trailingPartialChunkLength);
    }

    @Override
    public CompletableFuture<Long> flush(boolean syncBlock) {
        CompletableFuture<Long> future = new CompletableFuture<Long>();
        this.flush0(future, syncBlock);
        return future;
    }

    private void endBlock() throws IOException {
        Preconditions.checkState((boolean)this.waitingAckQueue.isEmpty(), (Object)"should call flush first before calling close");
        if (this.state != State.STREAMING) {
            throw new IOException("stream already broken");
        }
        this.state = State.CLOSING;
        long finalizedLength = this.ackedBlockLength;
        PacketHeader header = new PacketHeader(4, finalizedLength, this.nextPacketSeqno, true, 0, false);
        this.buf.release();
        this.buf = null;
        int headerLen = header.getSerializedSize();
        ByteBuf headerBuf = this.alloc.directBuffer(headerLen);
        header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
        headerBuf.writerIndex(headerLen);
        CompletableFuture<Long> future = new CompletableFuture<Long>();
        this.waitingAckQueue.add(new Callback(future, finalizedLength, this.datanodeList));
        this.datanodeList.forEach(ch -> ch.writeAndFlush((Object)headerBuf.retainedDuplicate()));
        headerBuf.release();
        try {
            future.get();
        }
        catch (InterruptedException e) {
            throw (IOException)new InterruptedIOException().initCause(e);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Throwables.propagateIfPossible((Throwable)cause, IOException.class);
            throw new IOException(cause);
        }
    }

    @Override
    public void recoverAndClose(CancelableProgressable reporter) throws IOException {
        if (this.buf != null) {
            this.buf.release();
            this.buf = null;
        }
        this.datanodeList.forEach(ch -> ch.close());
        this.datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
        FanOutOneBlockAsyncDFSOutputHelper.endFileLease(this.client, this.fileId);
        this.fsUtils.recoverFileLease((FileSystem)this.dfs, new Path(this.src), this.conf, reporter == null ? new FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose(this.client) : reporter);
    }

    @Override
    public void close() throws IOException {
        this.endBlock();
        this.state = State.CLOSED;
        this.datanodeList.forEach(ch -> ch.close());
        this.datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
        this.block.setNumBytes(this.ackedBlockLength);
        FanOutOneBlockAsyncDFSOutputHelper.completeFile(this.client, this.namenode, this.src, this.clientName, this.block, this.fileId);
    }

    @Override
    public boolean isBroken() {
        return this.state == State.BROKEN;
    }

    @Override
    public long getSyncedLength() {
        return this.ackedBlockLength;
    }

    @ChannelHandler.Sharable
    private final class AckHandler
    extends SimpleChannelInboundHandler<DataTransferProtos.PipelineAckProto> {
        private final int timeoutMs;

        public AckHandler(int timeoutMs) {
            this.timeoutMs = timeoutMs;
        }

        protected void channelRead0(ChannelHandlerContext ctx, DataTransferProtos.PipelineAckProto ack) throws Exception {
            DataTransferProtos.Status reply = FanOutOneBlockAsyncDFSOutputHelper.getStatus(ack);
            if (reply != DataTransferProtos.Status.SUCCESS) {
                FanOutOneBlockAsyncDFSOutput.this.failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + FanOutOneBlockAsyncDFSOutput.this.block + " from datanode " + ctx.channel().remoteAddress()));
                return;
            }
            if (PipelineAck.isRestartOOBStatus((DataTransferProtos.Status)reply)) {
                FanOutOneBlockAsyncDFSOutput.this.failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " + FanOutOneBlockAsyncDFSOutput.this.block + " from datanode " + ctx.channel().remoteAddress()));
                return;
            }
            if (ack.getSeqno() == -1L) {
                return;
            }
            FanOutOneBlockAsyncDFSOutput.this.completed(ctx.channel());
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            if (FanOutOneBlockAsyncDFSOutput.this.state == State.CLOSED) {
                return;
            }
            FanOutOneBlockAsyncDFSOutput.this.failed(ctx.channel(), () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            FanOutOneBlockAsyncDFSOutput.this.failed(ctx.channel(), () -> cause);
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent e = (IdleStateEvent)evt;
                if (e.state() == IdleState.READER_IDLE) {
                    FanOutOneBlockAsyncDFSOutput.this.failed(ctx.channel(), () -> new IOException("Timeout(" + this.timeoutMs + "ms) waiting for response"));
                } else if (e.state() == IdleState.WRITER_IDLE) {
                    PacketHeader heartbeat = new PacketHeader(4, 0L, -1L, false, 0, false);
                    int len = heartbeat.getSerializedSize();
                    ByteBuf buf = FanOutOneBlockAsyncDFSOutput.this.alloc.buffer(len);
                    heartbeat.putInBuffer(buf.nioBuffer(0, len));
                    buf.writerIndex(len);
                    ctx.channel().writeAndFlush((Object)buf);
                }
                return;
            }
            super.userEventTriggered(ctx, evt);
        }
    }

    private static enum State {
        STREAMING,
        CLOSING,
        BROKEN,
        CLOSED;

    }

    private static final class Callback {
        private final CompletableFuture<Long> future;
        private final long ackedLength;
        private final Set<ChannelId> unfinishedReplicas;

        public Callback(CompletableFuture<Long> future, long ackedLength, Collection<Channel> replicas) {
            this.future = future;
            this.ackedLength = ackedLength;
            if (replicas.isEmpty()) {
                this.unfinishedReplicas = Collections.emptySet();
            } else {
                this.unfinishedReplicas = Collections.newSetFromMap(new ConcurrentHashMap(replicas.size()));
                replicas.stream().map(c -> c.id()).forEachOrdered(this.unfinishedReplicas::add);
            }
        }
    }
}

