/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.igfs;

import java.io.Closeable;
import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsOutOfSpaceException;
import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsUserContext;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.igfs.common.IgfsControlResponse;
import org.apache.ignite.internal.igfs.common.IgfsHandshakeRequest;
import org.apache.ignite.internal.igfs.common.IgfsIpcCommand;
import org.apache.ignite.internal.igfs.common.IgfsMessage;
import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest;
import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest;
import org.apache.ignite.internal.processors.igfs.IgfsClientSession;
import org.apache.ignite.internal.processors.igfs.IgfsContext;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
import org.apache.ignite.internal.processors.igfs.IgfsInputStreamDescriptor;
import org.apache.ignite.internal.processors.igfs.IgfsInputStreamImpl;
import org.apache.ignite.internal.processors.igfs.IgfsServerHandler;
import org.apache.ignite.internal.processors.igfs.IgfsStatus;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;

class IgfsIpcHandler
implements IgfsServerHandler {
    private static boolean errWrite;
    private final GridKernalContext ctx;
    private IgniteLogger log;
    private final int bufSize;
    private final IgfsEx igfs;
    private final AtomicLong rsrcIdGen = new AtomicLong();
    private volatile IgniteThreadPoolExecutor pool;
    private volatile boolean stopping;

    IgfsIpcHandler(IgfsContext igfsCtx, IgfsIpcEndpointConfiguration endpointCfg, boolean mgmt) {
        assert (igfsCtx != null);
        this.ctx = igfsCtx.kernalContext();
        this.igfs = igfsCtx.igfs();
        this.bufSize = igfsCtx.configuration().getBlockSize() * 2;
        int threadCnt = endpointCfg.getThreadCount();
        String prefix = "igfs-" + igfsCtx.igfs().name() + (mgmt ? "mgmt-" : "") + "-ipc";
        this.pool = new IgniteThreadPoolExecutor(prefix, igfsCtx.kernalContext().gridName(), threadCnt, threadCnt, Long.MAX_VALUE, new LinkedBlockingQueue<Runnable>());
        this.log = this.ctx.log(IgfsIpcHandler.class);
    }

    @Override
    public void stop() throws IgniteCheckedException {
        this.stopping = true;
        U.shutdownNow(this.getClass(), this.pool, this.log);
        this.pool = null;
    }

    @Override
    public void onClosed(IgfsClientSession ses) {
        Iterator<Closeable> it = ses.registeredResources();
        while (it.hasNext()) {
            Closeable stream = it.next();
            try {
                stream.close();
            }
            catch (IOException e) {
                U.warn(this.log, "Failed to close opened stream on client close event (will continue) [ses=" + ses + ", stream=" + stream + ']', e);
            }
        }
    }

    @Override
    public IgniteInternalFuture<IgfsMessage> handleAsync(final IgfsClientSession ses, final IgfsMessage msg, final DataInput in) {
        try {
            IgniteInternalFuture<IgfsMessage> fut;
            if (this.stopping) {
                return null;
            }
            final IgfsIpcCommand cmd = msg.command();
            switch (cmd) {
                case WRITE_BLOCK: 
                case MAKE_DIRECTORIES: 
                case LIST_FILES: 
                case LIST_PATHS: {
                    fut = this.executeSynchronously(ses, cmd, msg, in);
                    break;
                }
                default: {
                    try {
                        final GridFutureAdapter<IgfsMessage> fut0 = new GridFutureAdapter<IgfsMessage>();
                        this.pool.execute(new Runnable(){

                            @Override
                            public void run() {
                                try {
                                    fut0.onDone(IgfsIpcHandler.this.execute(ses, cmd, msg, in));
                                }
                                catch (Exception e) {
                                    fut0.onDone(e);
                                }
                            }
                        });
                        fut = fut0;
                        break;
                    }
                    catch (RejectedExecutionException ignored) {
                        fut = this.executeSynchronously(ses, cmd, msg, in);
                    }
                }
            }
            return fut;
        }
        catch (Exception e) {
            return new GridFinishedFuture<IgfsMessage>(e);
        }
    }

    @Nullable
    private IgniteInternalFuture<IgfsMessage> executeSynchronously(IgfsClientSession ses, IgfsIpcCommand cmd, IgfsMessage msg, DataInput in) throws Exception {
        IgfsMessage res = this.execute(ses, cmd, msg, in);
        return res == null ? null : new GridFinishedFuture<IgfsMessage>(res);
    }

    private IgfsMessage execute(IgfsClientSession ses, IgfsIpcCommand cmd, IgfsMessage msg, @Nullable DataInput in) throws Exception {
        switch (cmd) {
            case HANDSHAKE: {
                return this.processHandshakeRequest((IgfsHandshakeRequest)msg);
            }
            case STATUS: {
                return this.processStatusRequest();
            }
            case MAKE_DIRECTORIES: 
            case LIST_FILES: 
            case LIST_PATHS: 
            case EXISTS: 
            case INFO: 
            case PATH_SUMMARY: 
            case UPDATE: 
            case RENAME: 
            case DELETE: 
            case SET_TIMES: 
            case AFFINITY: 
            case OPEN_READ: 
            case OPEN_CREATE: 
            case OPEN_APPEND: {
                return this.processPathControlRequest(ses, cmd, msg);
            }
            case WRITE_BLOCK: 
            case CLOSE: 
            case READ_BLOCK: {
                return this.processStreamControlRequest(ses, cmd, msg, in);
            }
        }
        throw new IgniteCheckedException("Unsupported IPC command: " + (Object)((Object)cmd));
    }

    private IgfsMessage processHandshakeRequest(IgfsHandshakeRequest req) throws IgniteCheckedException {
        if (req.gridName() != null && !F.eq(this.ctx.gridName(), req.gridName())) {
            throw new IgniteCheckedException("Failed to perform handshake because existing Grid name differs from requested [requested=" + req.gridName() + ", existing=" + this.ctx.gridName() + ']');
        }
        if (req.igfsName() != null && !F.eq(this.igfs.name(), req.igfsName())) {
            throw new IgniteCheckedException("Failed to perform handshake because existing IGFS name differs from requested [requested=" + req.igfsName() + ", existing=" + this.igfs.name() + ']');
        }
        IgfsControlResponse res = new IgfsControlResponse();
        this.igfs.clientLogDirectory(req.logDirectory());
        IgfsHandshakeResponse handshake = new IgfsHandshakeResponse(this.igfs.name(), this.igfs.proxyPaths(), this.igfs.groupBlockSize(), this.igfs.globalSampling());
        res.handshake(handshake);
        return res;
    }

    private IgfsMessage processStatusRequest() throws IgniteCheckedException {
        IgfsStatus status = this.igfs.globalSpace();
        IgfsControlResponse res = new IgfsControlResponse();
        res.status(status);
        return res;
    }

    private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final IgfsIpcCommand cmd, IgfsMessage msg) throws IgniteCheckedException {
        final IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Processing path control request [igfsName=" + this.igfs.name() + ", req=" + req + ']');
        }
        final IgfsControlResponse res = new IgfsControlResponse();
        String userName = req.userName();
        assert (userName != null);
        try {
            IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>(){

                @Override
                public Void apply() {
                    switch (cmd) {
                        case EXISTS: {
                            res.response(IgfsIpcHandler.this.igfs.exists(req.path()));
                            break;
                        }
                        case INFO: {
                            res.response(IgfsIpcHandler.this.igfs.info(req.path()));
                            break;
                        }
                        case PATH_SUMMARY: {
                            res.response(IgfsIpcHandler.this.igfs.summary(req.path()));
                            break;
                        }
                        case UPDATE: {
                            res.response(IgfsIpcHandler.this.igfs.update(req.path(), req.properties()));
                            break;
                        }
                        case RENAME: {
                            IgfsIpcHandler.this.igfs.rename(req.path(), req.destinationPath());
                            res.response(true);
                            break;
                        }
                        case DELETE: {
                            res.response(IgfsIpcHandler.this.igfs.delete(req.path(), req.flag()));
                            break;
                        }
                        case MAKE_DIRECTORIES: {
                            IgfsIpcHandler.this.igfs.mkdirs(req.path(), req.properties());
                            res.response(true);
                            break;
                        }
                        case LIST_PATHS: {
                            res.paths(IgfsIpcHandler.this.igfs.listPaths(req.path()));
                            break;
                        }
                        case LIST_FILES: {
                            res.files(IgfsIpcHandler.this.igfs.listFiles(req.path()));
                            break;
                        }
                        case SET_TIMES: {
                            IgfsIpcHandler.this.igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
                            res.response(true);
                            break;
                        }
                        case AFFINITY: {
                            res.locations(IgfsIpcHandler.this.igfs.affinity(req.path(), req.start(), req.length()));
                            break;
                        }
                        case OPEN_READ: {
                            IgfsInputStream igfsIn = !req.flag() ? IgfsIpcHandler.this.igfs.open(req.path(), IgfsIpcHandler.this.bufSize) : IgfsIpcHandler.this.igfs.open(req.path(), IgfsIpcHandler.this.bufSize, req.sequentialReadsBeforePrefetch());
                            long streamId = IgfsIpcHandler.this.registerResource(ses, igfsIn);
                            if (IgfsIpcHandler.this.log.isDebugEnabled()) {
                                IgfsIpcHandler.this.log.debug("Opened IGFS input stream for file read [igfsName=" + IgfsIpcHandler.this.igfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
                            }
                            res.response(new IgfsInputStreamDescriptor(streamId, igfsIn.length()));
                            break;
                        }
                        case OPEN_CREATE: {
                            long streamId = IgfsIpcHandler.this.registerResource(ses, IgfsIpcHandler.this.igfs.create(req.path(), IgfsIpcHandler.this.bufSize, req.flag(), IgfsIpcHandler.this.affinityKey(req), req.replication(), req.blockSize(), req.properties()));
                            if (IgfsIpcHandler.this.log.isDebugEnabled()) {
                                IgfsIpcHandler.this.log.debug("Opened IGFS output stream for file create [igfsName=" + IgfsIpcHandler.this.igfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
                            }
                            res.response(streamId);
                            break;
                        }
                        case OPEN_APPEND: {
                            long streamId = IgfsIpcHandler.this.registerResource(ses, IgfsIpcHandler.this.igfs.append(req.path(), IgfsIpcHandler.this.bufSize, req.flag(), req.properties()));
                            if (IgfsIpcHandler.this.log.isDebugEnabled()) {
                                IgfsIpcHandler.this.log.debug("Opened IGFS output stream for file append [igfsName=" + IgfsIpcHandler.this.igfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
                            }
                            res.response(streamId);
                            break;
                        }
                        default: {
                            assert (false) : "Unhandled path control request command: " + (Object)((Object)cmd);
                            break;
                        }
                    }
                    return null;
                }
            });
        }
        catch (IgniteException e) {
            throw new IgniteCheckedException(e);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Finished processing path control request [igfsName=" + this.igfs.name() + ", req=" + req + ", res=" + res + ']');
        }
        return res;
    }

    private IgfsMessage processStreamControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd, IgfsMessage msg, DataInput in) throws IgniteCheckedException, IOException {
        IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg;
        Long rsrcId = req.streamId();
        IgfsControlResponse resp = new IgfsControlResponse();
        switch (cmd) {
            case CLOSE: {
                Closeable res = this.resource(ses, rsrcId);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Requested to close resource [igfsName=" + this.igfs.name() + ", rsrcId=" + rsrcId + ", res=" + res + ']');
                }
                if (res == null) {
                    throw new IgniteCheckedException("Resource to close not found: " + rsrcId);
                }
                try {
                    res.close();
                }
                catch (IOException e) {
                    IgfsOutOfSpaceException space = X.cause(e, IgfsOutOfSpaceException.class);
                    if (space != null) {
                        throw space;
                    }
                    throw e;
                }
                boolean success = ses.unregisterResource(rsrcId, res);
                assert (success) : "Failed to unregister resource [igfsName=" + this.igfs.name() + ", rsrcId=" + rsrcId + ", res=" + res + ']';
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Closed IGFS stream [igfsName=" + this.igfs.name() + ", streamId=" + rsrcId + ", ses=" + ses + ']');
                }
                resp.response(true);
                break;
            }
            case READ_BLOCK: {
                long pos = req.position();
                int size = req.length();
                IgfsInputStreamImpl igfsIn = (IgfsInputStreamImpl)this.resource(ses, rsrcId);
                if (igfsIn == null) {
                    throw new IgniteCheckedException("Input stream not found (already closed?): " + rsrcId);
                }
                byte[][] chunks = igfsIn.readChunks(pos, size);
                resp.response(chunks);
                int len = 0;
                if (chunks.length > 0) {
                    len += chunks[0].length;
                }
                if (chunks.length > 1) {
                    len += chunks[chunks.length - 1].length;
                }
                if (chunks.length > 2) {
                    len += chunks[1].length * (chunks.length - 2);
                }
                resp.length(len);
                break;
            }
            case WRITE_BLOCK: {
                IgfsOutputStream out = (IgfsOutputStream)this.resource(ses, rsrcId);
                if (out == null) {
                    throw new IgniteCheckedException("Output stream not found (already closed?): " + rsrcId);
                }
                int writeLen = req.length();
                try {
                    out.transferFrom(in, writeLen);
                    if (errWrite) {
                        throw new IOException("Failed to write data to server (test).");
                    }
                    return null;
                }
                catch (IOException e) {
                    resp.error(rsrcId, e.getMessage());
                    break;
                }
            }
            default: {
                assert (false);
                break;
            }
        }
        return resp;
    }

    @Nullable
    private IgniteUuid affinityKey(IgfsPathControlRequest req) {
        if (!req.colocate()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Will not generate affinity key for path control request [igfsName=" + this.igfs.name() + ", req=" + req + ']');
            }
            return null;
        }
        IgniteUuid key = this.igfs.nextAffinityKey();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Generated affinity key for path control request [igfsName=" + this.igfs.name() + ", req=" + req + ", key=" + key + ']');
        }
        return key;
    }

    private long registerResource(IgfsClientSession ses, Closeable rsrc) {
        long rsrcId = this.rsrcIdGen.getAndIncrement();
        boolean registered = ses.registerResource(rsrcId, rsrc);
        assert (registered) : "Failed to register resource (duplicate id?): " + rsrcId;
        return rsrcId;
    }

    @Nullable
    private Closeable resource(IgfsClientSession ses, Long rsrcId) {
        return (Closeable)ses.resource(rsrcId);
    }
}

