package tachyon.worker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.log4j.Logger;
import tachyon.Constants;
import tachyon.util.CommonUtils;

/* loaded from: input_file:tachyon/worker/DataServer.class */
public class DataServer implements Runnable {
    private static final Logger LOG = Logger.getLogger(Constants.LOGGER_TYPE);
    private InetSocketAddress mAddress;
    private ServerSocketChannel mServerChannel;
    private Selector mSelector;
    private final BlocksLocker mBlocksLocker;
    private Map<SocketChannel, DataServerMessage> mSendingData = Collections.synchronizedMap(new HashMap());
    private Map<SocketChannel, DataServerMessage> mReceivingData = Collections.synchronizedMap(new HashMap());
    private volatile boolean mShutdown = false;
    private volatile boolean mShutdowned = false;

    public DataServer(InetSocketAddress inetSocketAddress, WorkerStorage workerStorage) {
        LOG.info("Starting DataServer @ " + inetSocketAddress);
        this.mAddress = inetSocketAddress;
        this.mBlocksLocker = new BlocksLocker(workerStorage, -1);
        try {
            this.mSelector = initSelector();
        } catch (IOException e) {
            LOG.error(e.getMessage() + this.mAddress, e);
            CommonUtils.runtimeException(e);
        }
    }

    private void accept(SelectionKey selectionKey) throws IOException {
        SocketChannel accept = ((ServerSocketChannel) selectionKey.channel()).accept();
        accept.configureBlocking(false);
        accept.register(this.mSelector, 1);
    }

    public void close() throws IOException {
        this.mShutdown = true;
        this.mServerChannel.close();
        this.mSelector.close();
    }

    private Selector initSelector() throws IOException {
        AbstractSelector openSelector = SelectorProvider.provider().openSelector();
        this.mServerChannel = ServerSocketChannel.open();
        this.mServerChannel.configureBlocking(false);
        this.mServerChannel.socket().bind(this.mAddress);
        this.mServerChannel.register(openSelector, 16);
        return openSelector;
    }

    public boolean isClosed() {
        return this.mShutdowned;
    }

    private void read(SelectionKey selectionKey) throws IOException {
        DataServerMessage createBlockRequestMessage;
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (this.mReceivingData.containsKey(socketChannel)) {
            createBlockRequestMessage = this.mReceivingData.get(socketChannel);
        } else {
            createBlockRequestMessage = DataServerMessage.createBlockRequestMessage();
            this.mReceivingData.put(socketChannel, createBlockRequestMessage);
        }
        try {
            if (createBlockRequestMessage.recv(socketChannel) == -1) {
                selectionKey.channel().close();
                selectionKey.cancel();
                this.mReceivingData.remove(socketChannel);
                this.mSendingData.remove(socketChannel);
                return;
            }
            if (createBlockRequestMessage.isMessageReady()) {
                if (createBlockRequestMessage.getBlockId() <= 0) {
                    LOG.error("Invalid block id " + createBlockRequestMessage.getBlockId());
                    return;
                }
                selectionKey.interestOps(4);
                LOG.info("Get request for " + createBlockRequestMessage.getBlockId());
                int lock = this.mBlocksLocker.lock(createBlockRequestMessage.getBlockId());
                DataServerMessage createBlockResponseMessage = DataServerMessage.createBlockResponseMessage(true, createBlockRequestMessage.getBlockId(), createBlockRequestMessage.getOffset(), createBlockRequestMessage.getLength());
                createBlockResponseMessage.setLockId(lock);
                this.mSendingData.put(socketChannel, createBlockResponseMessage);
            }
        } catch (IOException e) {
            selectionKey.cancel();
            socketChannel.close();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.mShutdown) {
            try {
                this.mSelector.select();
                if (this.mShutdown) {
                    break;
                }
                Iterator<SelectionKey> it = this.mSelector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey next = it.next();
                    it.remove();
                    if (next.isValid()) {
                        if (next.isAcceptable()) {
                            accept(next);
                        } else if (next.isReadable()) {
                            read(next);
                        } else if (next.isWritable()) {
                            write(next);
                        }
                    }
                }
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
                if (!this.mShutdown) {
                    throw new RuntimeException(e);
                }
            }
        }
        this.mShutdowned = true;
    }

    private void write(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        DataServerMessage dataServerMessage = this.mSendingData.get(socketChannel);
        boolean z = false;
        try {
            dataServerMessage.send(socketChannel);
        } catch (IOException e) {
            z = true;
            LOG.error(e.getMessage());
        }
        if (dataServerMessage.finishSending() || z) {
            try {
                selectionKey.channel().close();
            } catch (IOException e2) {
                LOG.error(e2.getMessage());
            }
            selectionKey.cancel();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            dataServerMessage.close();
            this.mBlocksLocker.unlock(Math.abs(dataServerMessage.getBlockId()), dataServerMessage.getLockId());
        }
    }
}
