/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.spi.swapspace.file;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.GridAtomicInitializer;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiCloseableIterator;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.swapspace.SwapContext;
import org.apache.ignite.spi.swapspace.SwapKey;
import org.apache.ignite.spi.swapspace.SwapSpaceSpi;
import org.apache.ignite.spi.swapspace.SwapSpaceSpiListener;
import org.apache.ignite.spi.swapspace.file.FileSwapArray;
import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpiMBean;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;

@IgniteSpiMultipleInstancesSupport(value=true)
@IgniteSpiConsistencyChecked(optional=false, checkClient=false)
public class FileSwapSpaceSpi
extends IgniteSpiAdapter
implements SwapSpaceSpi,
FileSwapSpaceSpiMBean {
    public static final String DFLT_BASE_DIR = "swapspace";
    public static final float DFLT_MAX_SPARSITY = 0.5f;
    public static final int DFLT_BUF_SIZE = 65536;
    public static final int DFLT_QUE_SIZE = 0x100000;
    public static final String DFLT_SPACE_NAME = "gg-dflt-space";
    private final ConcurrentMap<String, Space> spaces = new ConcurrentHashMap<String, Space>();
    private String baseDir = "swapspace";
    private float maxSparsity = 0.5f;
    private volatile SwapSpaceSpiListener evictLsnr;
    private File dir;
    private int writeBufSize = 65536;
    private int maxWriteQueSize = 0x100000;
    private int readStripesNum = -1;
    @LoggerResource
    private IgniteLogger log;

    @Override
    public String getBaseDirectory() {
        return this.baseDir;
    }

    @IgniteSpiConfiguration(optional=true)
    public void setBaseDirectory(String baseDir) {
        this.baseDir = baseDir;
    }

    @Override
    public float getMaximumSparsity() {
        return this.maxSparsity;
    }

    public void setMaximumSparsity(float maxSparsity) {
        this.maxSparsity = maxSparsity;
    }

    @Override
    public int getWriteBufferSize() {
        return this.writeBufSize;
    }

    public void setWriteBufferSize(int writeBufSize) {
        this.writeBufSize = writeBufSize;
    }

    @Override
    public int getMaxWriteQueueSize() {
        return this.maxWriteQueSize;
    }

    public void setMaxWriteQueueSize(int maxWriteQueSize) {
        this.maxWriteQueSize = maxWriteQueSize;
    }

    @Override
    public int getReadStripesNumber() {
        return this.readStripesNum;
    }

    public void setReadStripesNumber(int readStripesNum) {
        A.ensure(readStripesNum == -1 || (readStripesNum & readStripesNum - 1) == 0, "readStripesNum must be positive and power of two");
        this.readStripesNum = readStripesNum;
    }

    @Override
    public void spiStart(@Nullable String gridName) throws IgniteSpiException {
        this.assertParameter(!F.isEmpty(this.baseDir), "!F.isEmpty(baseDir)");
        this.assertParameter(this.maxSparsity >= 0.0f && this.maxSparsity < 1.0f, "maxSparsity >= 0 && maxSparsity < 1");
        this.assertParameter(this.readStripesNum == -1 || (this.readStripesNum & this.readStripesNum - 1) == 0, "readStripesNum must be positive and power of two.");
        if (this.readStripesNum == -1) {
            int readStripesNum0;
            int cpuCnt = Runtime.getRuntime().availableProcessors();
            for (readStripesNum0 = 1; readStripesNum0 <= cpuCnt; readStripesNum0 <<= 1) {
            }
            if (readStripesNum0 > cpuCnt) {
                readStripesNum0 >>= 1;
            }
            assert (readStripesNum0 > 0 && (readStripesNum0 & readStripesNum0 - 1) == 0);
            this.readStripesNum = readStripesNum0;
        }
        this.startStopwatch();
        this.registerMBean(gridName, this, FileSwapSpaceSpiMBean.class);
        String path = this.baseDir + File.separator + gridName + File.separator + this.ignite.configuration().getNodeId();
        try {
            this.dir = U.resolveWorkDirectory(this.ignite.configuration().getWorkDirectory(), path, true);
        }
        catch (IgniteCheckedException e) {
            throw new IgniteSpiException(e);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.startInfo());
        }
    }

    @Override
    public void spiStop() throws IgniteSpiException {
        this.unregisterMBean();
        for (Space space : this.spaces.values()) {
            space.initialize();
            try {
                space.stop();
            }
            catch (IgniteInterruptedCheckedException e) {
                U.error(this.log, "Interrupted.", e);
            }
        }
        if (this.dir != null && this.dir.exists() && !U.delete(this.dir)) {
            U.warn(this.log, "Failed to delete swap directory: " + this.dir.getAbsolutePath());
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.stopInfo());
        }
    }

    @Override
    public void clear(@Nullable String spaceName) throws IgniteSpiException {
        this.destruct(spaceName);
        this.notifyListener(74, spaceName);
    }

    @Override
    public long size(@Nullable String spaceName) throws IgniteSpiException {
        Space space = this.space(spaceName, false);
        if (space == null) {
            return 0L;
        }
        return space.size();
    }

    @Override
    public long count(@Nullable String spaceName) throws IgniteSpiException {
        Space space = this.space(spaceName, false);
        if (space == null) {
            return 0L;
        }
        return space.count();
    }

    @Override
    public long count(@Nullable String spaceName, Set<Integer> parts) throws IgniteSpiException {
        Space space = this.space(spaceName, false);
        if (space == null) {
            return 0L;
        }
        return space.count(parts);
    }

    @Override
    @Nullable
    public byte[] read(@Nullable String spaceName, SwapKey key, SwapContext ctx) throws IgniteSpiException {
        assert (key != null);
        assert (ctx != null);
        Space space = this.space(spaceName, false);
        if (space == null) {
            return null;
        }
        byte[] val = space.read(key);
        this.notifyListener(71, spaceName);
        return val;
    }

    @Override
    public Map<SwapKey, byte[]> readAll(@Nullable String spaceName, Iterable<SwapKey> keys, SwapContext ctx) throws IgniteSpiException {
        assert (keys != null);
        assert (ctx != null);
        Space space = this.space(spaceName, false);
        if (space == null) {
            return Collections.emptyMap();
        }
        HashMap<SwapKey, byte[]> res = new HashMap<SwapKey, byte[]>();
        for (SwapKey key : keys) {
            if (key == null) continue;
            byte[] val = space.read(key);
            if (val != null) {
                res.put(key, val);
            }
            this.notifyListener(71, spaceName);
        }
        return res;
    }

    @Override
    public void remove(@Nullable String spaceName, SwapKey key, @Nullable IgniteInClosure<byte[]> c, SwapContext ctx) throws IgniteSpiException {
        byte[] val;
        assert (key != null);
        assert (ctx != null);
        Space space = this.space(spaceName, false);
        byte[] byArray = space == null ? null : (val = space.remove(key, c != null));
        if (c != null) {
            c.apply(val);
        }
        if (space != null) {
            this.notifyListener(73, spaceName);
        }
    }

    @Override
    public void removeAll(@Nullable String spaceName, Collection<SwapKey> keys, @Nullable IgniteBiInClosure<SwapKey, byte[]> c, SwapContext ctx) throws IgniteSpiException {
        assert (keys != null);
        assert (ctx != null);
        Space space = this.space(spaceName, false);
        if (space == null) {
            return;
        }
        for (SwapKey key : keys) {
            if (key == null) continue;
            byte[] val = space.remove(key, c != null);
            if (c != null) {
                c.apply(key, val);
            }
            this.notifyListener(73, spaceName);
        }
    }

    @Override
    public void store(@Nullable String spaceName, SwapKey key, @Nullable byte[] val, SwapContext ctx) throws IgniteSpiException {
        assert (key != null);
        assert (ctx != null);
        Space space = this.space(spaceName, true);
        assert (space != null);
        space.store(key, val);
        this.notifyListener(72, spaceName);
    }

    @Override
    public void storeAll(@Nullable String spaceName, Map<SwapKey, byte[]> pairs, SwapContext ctx) throws IgniteSpiException {
        assert (pairs != null);
        assert (ctx != null);
        Space space = this.space(spaceName, true);
        assert (space != null);
        for (Map.Entry<SwapKey, byte[]> pair : pairs.entrySet()) {
            SwapKey key = pair.getKey();
            if (key == null) continue;
            space.store(key, pair.getValue());
            this.notifyListener(72, spaceName);
        }
    }

    @Override
    public void setListener(@Nullable SwapSpaceSpiListener evictLsnr) {
        this.evictLsnr = evictLsnr;
    }

    @Override
    @Nullable
    public Collection<Integer> partitions(@Nullable String spaceName) throws IgniteSpiException {
        Space space = this.space(spaceName, false);
        if (space == null) {
            return null;
        }
        return space.partitions();
    }

    @Override
    @Nullable
    public <K> IgniteSpiCloseableIterator<K> keyIterator(@Nullable String spaceName, SwapContext ctx) throws IgniteSpiException {
        Space space = this.space(spaceName, false);
        if (space == null) {
            return null;
        }
        final Iterator<Map.Entry<SwapKey, byte[]>> iter = space.entriesIterator();
        return new GridCloseableIteratorAdapter<K>(){

            @Override
            protected boolean onHasNext() {
                return iter.hasNext();
            }

            @Override
            protected K onNext() {
                return ((SwapKey)((Map.Entry)iter.next()).getKey()).key();
            }

            @Override
            protected void onRemove() {
                iter.remove();
            }
        };
    }

    @Override
    @Nullable
    public IgniteSpiCloseableIterator<Map.Entry<byte[], byte[]>> rawIterator(@Nullable String spaceName) throws IgniteSpiException {
        Space space = this.space(spaceName, false);
        if (space == null) {
            return null;
        }
        return this.rawIterator(space.entriesIterator());
    }

    @Override
    @Nullable
    public IgniteSpiCloseableIterator<Map.Entry<byte[], byte[]>> rawIterator(@Nullable String spaceName, int part) throws IgniteSpiException {
        Space space = this.space(spaceName, false);
        if (space == null) {
            return null;
        }
        return this.rawIterator(space.entriesIterator(part));
    }

    private IgniteSpiCloseableIterator<Map.Entry<byte[], byte[]>> rawIterator(final Iterator<Map.Entry<SwapKey, byte[]>> iter) {
        return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>(){

            @Override
            protected Map.Entry<byte[], byte[]> onNext() {
                Map.Entry x = (Map.Entry)iter.next();
                return new T2<byte[], byte[]>(FileSwapSpaceSpi.this.keyBytes((SwapKey)x.getKey()), (byte[])x.getValue());
            }

            @Override
            protected boolean onHasNext() {
                return iter.hasNext();
            }

            @Override
            protected void onRemove() {
                iter.remove();
            }
        };
    }

    private byte[] keyBytes(SwapKey key) throws IgniteSpiException {
        assert (key != null);
        byte[] keyBytes = key.keyBytes();
        if (keyBytes == null) {
            try {
                keyBytes = U.marshal(this.ignite.configuration().getMarshaller(), key.key());
            }
            catch (IgniteCheckedException e) {
                throw new IgniteSpiException("Failed to marshal key: " + key.key(), e);
            }
            key.keyBytes(keyBytes);
        }
        return keyBytes;
    }

    private void notifyListener(int evtType, @Nullable String spaceName) {
        SwapSpaceSpiListener lsnr = this.evictLsnr;
        if (lsnr != null) {
            lsnr.onSwapEvent(evtType, spaceName, null, null);
        }
    }

    @Nullable
    private Space space(@Nullable String name, boolean create) throws IgniteSpiException {
        String masked = FileSwapSpaceSpi.maskNull(name);
        assert (masked != null);
        Space space = (Space)this.spaces.get(masked);
        if (space == null && create) {
            this.validateName(name);
            space = new Space(masked, this.log);
            Space old = this.spaces.putIfAbsent(masked, space);
            if (old != null) {
                space = old;
            }
        }
        if (space != null) {
            space.initialize();
        }
        return space;
    }

    private void destruct(@Nullable String spaceName) {
        String masked = FileSwapSpaceSpi.maskNull(spaceName);
        Space space = (Space)this.spaces.remove(masked);
        if (space != null) {
            try {
                space.stop();
            }
            catch (IgniteInterruptedCheckedException e) {
                U.error(this.log, "Interrupted.", e);
            }
        }
    }

    private static String maskNull(String spaceName) {
        return spaceName != null ? spaceName : DFLT_SPACE_NAME;
    }

    private void validateName(@Nullable String name) throws IgniteSpiException {
        if (name == null) {
            return;
        }
        if (name.isEmpty()) {
            throw new IgniteSpiException("Space name cannot be empty: " + name);
        }
        if (DFLT_SPACE_NAME.equalsIgnoreCase(name)) {
            throw new IgniteSpiException("Space name is reserved for default space: " + name);
        }
        if (name.contains("/") || name.contains("\\")) {
            throw new IgniteSpiException("Space name contains invalid characters: " + name);
        }
    }

    public String toString() {
        return S.toString(FileSwapSpaceSpi.class, this);
    }

    private class Space {
        private final String name;
        private final GridAtomicInitializer<Void> initializer = new GridAtomicInitializer();
        private SwapFile left;
        private SwapFile right;
        private final SwapValuesQueue que;
        private final ConcurrentMap<Integer, ConcurrentMap<SwapKey, SwapValue>> parts = new ConcurrentHashMap8<Integer, ConcurrentMap<SwapKey, SwapValue>>();
        private final AtomicLong size = new AtomicLong();
        private final AtomicLong cnt = new AtomicLong();
        private int sign = 1;
        private Thread writer;
        private Thread compactor;

        private Space(String name, IgniteLogger log) {
            assert (name != null);
            this.name = name;
            this.que = new SwapValuesQueue(FileSwapSpaceSpi.this.writeBufSize, FileSwapSpaceSpi.this.maxWriteQueSize, log);
        }

        public void initialize() throws IgniteSpiException {
            if (this.initializer.succeeded()) {
                return;
            }
            assert (FileSwapSpaceSpi.this.dir.exists());
            assert (FileSwapSpaceSpi.this.dir.isDirectory());
            try {
                this.initializer.init(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        Space.this.left = new SwapFile(new File(FileSwapSpaceSpi.this.dir, Space.this.name + ".left"), FileSwapSpaceSpi.this.readStripesNum);
                        Space.this.right = new SwapFile(new File(FileSwapSpaceSpi.this.dir, Space.this.name + ".right"), FileSwapSpaceSpi.this.readStripesNum);
                        final Object mux = new Object();
                        Space.this.writer = new IgniteSpiThread(FileSwapSpaceSpi.this.gridName, "Swap writer: " + Space.this.name, FileSwapSpaceSpi.this.log){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            protected void body() throws InterruptedException {
                                while (!this.isInterrupted()) {
                                    SwapValues vals = Space.this.que.take();
                                    Object object = mux;
                                    synchronized (object) {
                                        SwapFile f = Space.this.sign == 1 ? Space.this.right : Space.this.left;
                                        try {
                                            f.write(vals, Space.this.sign);
                                        }
                                        catch (Exception e) {
                                            throw new IgniteException(e);
                                        }
                                    }
                                }
                            }
                        };
                        Space.this.compactor = new IgniteSpiThread(FileSwapSpaceSpi.this.gridName, "Swap compactor: " + Space.this.name, FileSwapSpaceSpi.this.log){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            protected void body() throws InterruptedException {
                                SwapFile w = null;
                                SwapFile c = null;
                                ArrayDeque<SwapValue> vals = null;
                                while (!this.isInterrupted()) {
                                    Object object;
                                    while (!Space.this.needCompact()) {
                                        LockSupport.park();
                                        if (!this.isInterrupted()) continue;
                                        return;
                                    }
                                    ByteBuffer buf = null;
                                    if (vals == null) {
                                        vals = new ArrayDeque<SwapValue>();
                                    } else {
                                        vals.clear();
                                        try {
                                            buf = c.compact(vals, FileSwapSpaceSpi.this.writeBufSize);
                                        }
                                        catch (IOException e) {
                                            throw new IgniteException(e);
                                        }
                                    }
                                    if (vals.isEmpty()) {
                                        object = mux;
                                        synchronized (object) {
                                            Space.this.sign = -Space.this.sign;
                                            if (Space.this.sign == 1) {
                                                w = Space.this.right;
                                                c = Space.this.left;
                                            } else {
                                                w = Space.this.left;
                                                c = Space.this.right;
                                            }
                                            continue;
                                        }
                                    }
                                    assert (buf != null && buf.remaining() != 0);
                                    object = mux;
                                    synchronized (object) {
                                        try {
                                            w.write(vals, buf, Space.this.sign);
                                        }
                                        catch (Exception e) {
                                            throw new IgniteException(e);
                                        }
                                    }
                                }
                            }
                        };
                        Space.this.writer.start();
                        Space.this.compactor.start();
                        return null;
                    }
                });
            }
            catch (IgniteCheckedException e) {
                throw new IgniteSpiException(e);
            }
        }

        public long size() {
            return this.left.length() + this.right.length();
        }

        public long count() {
            return this.cnt.get();
        }

        public long count(Set<Integer> parts) {
            long cnt = 0L;
            for (Integer part : parts) {
                ConcurrentMap<SwapKey, SwapValue> map = this.partition(part, false);
                if (map == null) continue;
                cnt += (long)map.size();
            }
            return cnt;
        }

        public void stop() throws IgniteInterruptedCheckedException {
            U.interrupt(this.writer);
            U.interrupt(this.compactor);
            U.join(this.writer);
            U.join(this.compactor);
            this.left.delete();
            this.right.delete();
        }

        public void store(SwapKey key, @Nullable byte[] val) throws IgniteSpiException {
            assert (key != null);
            ConcurrentMap<SwapKey, SwapValue> part = this.partition(key.partition(), true);
            assert (part != null);
            if (val == null) {
                SwapValue swapVal = (SwapValue)part.remove(key);
                if (swapVal != null) {
                    this.removeFromFile(swapVal);
                    this.size.addAndGet(-swapVal.len);
                    this.cnt.decrementAndGet();
                }
                return;
            }
            SwapValue swapVal = new SwapValue(val);
            SwapValue old = part.put(key, swapVal);
            if (old != null) {
                this.size.addAndGet(val.length - old.len);
                this.removeFromFile(old);
            } else {
                this.size.addAndGet(val.length);
                this.cnt.incrementAndGet();
            }
            this.que.add(swapVal);
        }

        @Nullable
        public byte[] read(SwapKey key) throws IgniteSpiException {
            assert (key != null);
            ConcurrentMap<SwapKey, SwapValue> part = this.partition(key.partition(), false);
            if (part == null) {
                return null;
            }
            SwapValue swapVal = (SwapValue)part.get(key);
            if (swapVal == null) {
                return null;
            }
            return swapVal.value(this);
        }

        @Nullable
        public byte[] remove(SwapKey key, boolean read) throws IgniteSpiException {
            assert (key != null);
            ConcurrentMap<SwapKey, SwapValue> part = this.partition(key.partition(), false);
            if (part == null) {
                return null;
            }
            SwapValue val = (SwapValue)part.remove(key);
            if (val == null) {
                return null;
            }
            this.size.addAndGet(-val.len);
            this.cnt.decrementAndGet();
            byte[] bytes = null;
            if (read) {
                bytes = val.value(this);
                assert (bytes != null);
            }
            this.removeFromFile(val);
            return bytes;
        }

        private void removeFromFile(SwapValue val) {
            int idx;
            do {
                idx = val.idx;
                assert (idx != Integer.MIN_VALUE);
            } while (!val.casIdx(idx, Integer.MIN_VALUE));
            if (idx != 0) {
                SwapFile f = idx > 0 ? this.right : this.left;
                f.tryRemove(Math.abs(idx), val);
            }
            if (this.needCompact()) {
                LockSupport.unpark(this.compactor);
            }
        }

        private boolean needCompact() {
            long fileLen = this.size();
            return fileLen > (long)FileSwapSpaceSpi.this.writeBufSize && (float)(fileLen - this.size.get()) / (float)fileLen > FileSwapSpaceSpi.this.maxSparsity;
        }

        public Collection<Integer> partitions() {
            return this.parts.keySet();
        }

        @Nullable
        private ConcurrentMap<SwapKey, SwapValue> partition(int part, boolean create) {
            ConcurrentMap map = (ConcurrentHashMap)this.parts.get(part);
            if (map == null && create) {
                map = new ConcurrentHashMap();
                ConcurrentMap old = this.parts.putIfAbsent(part, map);
                if (old != null) {
                    map = old;
                }
            }
            return map;
        }

        public Iterator<Map.Entry<SwapKey, byte[]>> entriesIterator(int part) {
            ConcurrentMap<SwapKey, SwapValue> partMap = this.partition(part, false);
            if (partMap == null) {
                return Collections.emptySet().iterator();
            }
            return this.transform(partMap.entrySet().iterator());
        }

        public Iterator<Map.Entry<SwapKey, byte[]>> entriesIterator() {
            final Iterator iter = this.parts.values().iterator();
            return this.transform(F.concat(new Iterator<Iterator<Map.Entry<SwapKey, SwapValue>>>(){

                @Override
                public boolean hasNext() {
                    return iter.hasNext();
                }

                @Override
                public Iterator<Map.Entry<SwapKey, SwapValue>> next() {
                    return ((ConcurrentMap)iter.next()).entrySet().iterator();
                }

                @Override
                public void remove() {
                    throw new UnsupportedOperationException();
                }
            }));
        }

        private Iterator<Map.Entry<SwapKey, byte[]>> transform(final Iterator<Map.Entry<SwapKey, SwapValue>> iter) {
            return new Iterator<Map.Entry<SwapKey, byte[]>>(){
                private Map.Entry<SwapKey, byte[]> next;
                private Map.Entry<SwapKey, byte[]> last;
                {
                    this.advance();
                }

                @Override
                public boolean hasNext() {
                    return this.next != null;
                }

                private void advance() {
                    while (iter.hasNext()) {
                        byte[] bytes;
                        Map.Entry entry = (Map.Entry)iter.next();
                        try {
                            bytes = ((SwapValue)entry.getValue()).value(Space.this);
                        }
                        catch (IgniteSpiException e) {
                            throw new IgniteException(e);
                        }
                        if (bytes == null) continue;
                        this.next = new T2(entry.getKey(), bytes);
                        break;
                    }
                }

                @Override
                public Map.Entry<SwapKey, byte[]> next() {
                    Map.Entry<SwapKey, byte[]> res = this.next;
                    if (res == null) {
                        throw new NoSuchElementException();
                    }
                    this.next = null;
                    this.advance();
                    this.last = res;
                    return res;
                }

                @Override
                public void remove() {
                    if (this.last == null) {
                        throw new IllegalStateException();
                    }
                    try {
                        Space.this.remove(this.last.getKey(), false);
                    }
                    catch (IgniteSpiException e) {
                        throw new IgniteException(e);
                    }
                    finally {
                        this.last = null;
                    }
                }
            };
        }
    }

    static class SwapFile {
        private static final long MIN_TRUNK_SIZE = 0xA00000L;
        private final File file;
        private final RandomAccessFile raf;
        private final FileChannel writeCh;
        volatile StripedFileChannel readCh;
        private volatile long len;
        private final FileSwapArray<SwapValue> arr = new FileSwapArray();

        SwapFile(File file, int readerStripes) throws IOException {
            assert (file != null);
            file.delete();
            if (!file.createNewFile()) {
                throw new IllegalStateException("Failed to create file: " + file.getAbsolutePath());
            }
            this.file = file;
            this.raf = new RandomAccessFile(file, "rw");
            this.writeCh = this.raf.getChannel();
            this.readCh = new StripedFileChannel(file, readerStripes);
        }

        void reopenReadChannel() throws FileNotFoundException {
            this.readCh.close();
            this.readCh = new StripedFileChannel(this.file, this.readCh.chs.length);
        }

        public void write(Iterable<SwapValue> vals, ByteBuffer buf, int sign) throws Exception {
            for (SwapValue val : vals) {
                int idx;
                int oldIdx = val.idx;
                if (oldIdx == Integer.MIN_VALUE || val.casIdx(oldIdx, sign * (idx = this.arr.add(val)))) continue;
                assert (val.idx == Integer.MIN_VALUE);
                boolean res = this.tryRemove(idx, val);
                assert (res);
            }
            int size = buf.remaining();
            if (size == 0) {
                return;
            }
            long pos = this.len;
            this.len = pos + (long)size;
            long res = this.writeCh.write(buf, pos);
            if (res != (long)size) {
                throw new IllegalStateException(res + " != " + size);
            }
            for (SwapValue val : vals) {
                val.set(pos, null);
                pos += (long)val.len;
            }
        }

        public void write(SwapValues vals, int sign) throws Exception {
            ByteBuffer buf = ByteBuffer.allocateDirect(vals.size);
            int len = vals.vals.length;
            for (int i = 0; i < len; ++i) {
                SwapValue val = vals.vals[i];
                if (val.idx == Integer.MIN_VALUE) {
                    ((SwapValues)vals).vals[i] = null;
                    continue;
                }
                int idx = this.arr.add(val);
                if (!val.casIdx(0, sign * idx)) {
                    assert (val.idx == Integer.MIN_VALUE);
                    this.tryRemove(idx, val);
                    ((SwapValues)vals).vals[i] = null;
                    continue;
                }
                buf.put(val.value(null));
            }
            buf.flip();
            int size = buf.remaining();
            if (size == 0) {
                return;
            }
            long pos = this.len;
            this.len = pos + (long)size;
            long res = this.writeCh.write(buf, pos);
            if (res != (long)size) {
                throw new IllegalStateException(res + " != " + size);
            }
            for (SwapValue val : vals.vals) {
                if (val == null) continue;
                val.set(pos, null);
                pos += (long)val.len;
            }
        }

        public String path() {
            return this.file.getAbsolutePath();
        }

        public long length() {
            return this.len;
        }

        public boolean delete() {
            U.closeQuiet(this.raf);
            this.readCh.close();
            return U.delete(this.file);
        }

        public boolean tryRemove(int idx, SwapValue exp) {
            assert (idx > 0) : idx;
            FileSwapArray.Slot<SwapValue> s = this.arr.slot(idx);
            return s != null && s.cas(exp, null);
        }

        public ByteBuffer compact(ArrayDeque<SwapValue> vals, int bufSize) throws IOException, InterruptedException {
            assert (vals.isEmpty());
            Compact c = new Compact(vals, bufSize);
            c.doCompact();
            return c.result();
        }

        private class Compact {
            private final ArrayDeque<SwapValue> vals;
            private final int bufSize;
            private byte[] bytes;
            private ByteBuffer buf;
            private long beg = -1L;
            private long end = -1L;
            private int compacted;

            private Compact(ArrayDeque<SwapValue> vals, int bufSize) {
                assert (vals.isEmpty());
                this.vals = vals;
                this.bufSize = bufSize;
            }

            private void readAndCompact() throws IOException {
                assert (this.beg != -1L);
                if (this.buf == null) {
                    this.bytes = new byte[this.bufSize];
                    this.buf = ByteBuffer.wrap(this.bytes);
                }
                int pos = this.buf.position();
                int lim = (int)(this.end - this.beg + (long)pos);
                assert (pos >= 0);
                assert (pos < lim) : pos + " " + lim;
                assert (lim <= this.buf.capacity());
                this.buf.limit(lim);
                int res = SwapFile.this.writeCh.read(this.buf, this.beg);
                assert (res == lim - pos);
                int prevEnd = pos;
                long delta = this.beg - (long)pos;
                for (int j = this.vals.size(); j > this.compacted; --j) {
                    SwapValue val = this.vals.pollFirst();
                    int valPos = (int)(val.pos - delta);
                    if (prevEnd != valPos) {
                        assert (prevEnd < valPos) : prevEnd + " " + valPos;
                        U.arrayCopy(this.bytes, valPos, this.bytes, prevEnd, val.len);
                    }
                    prevEnd += val.len;
                    this.vals.addLast(val);
                }
                assert (prevEnd > 0) : prevEnd;
                this.buf.position(prevEnd);
                this.end = -1L;
                this.compacted = this.vals.size();
            }

            private void doCompact() throws IOException {
                int idx = SwapFile.this.arr.size();
                while (--idx > 0) {
                    long size;
                    FileSwapArray.Slot<SwapValue> s = SwapFile.this.arr.slot(idx);
                    assert (s != null);
                    SwapValue v = (SwapValue)s.get();
                    if (v == null || v.idx == Integer.MIN_VALUE) continue;
                    if (this.end == -1L) {
                        this.end = v.pos + (long)v.len;
                    }
                    if ((long)(this.buf == null ? this.bufSize : this.buf.remaining()) < (size = this.end - v.pos)) {
                        if (this.vals.isEmpty()) {
                            assert (this.bytes == null && this.buf == null);
                            this.bytes = new byte[(int)size];
                            this.buf = ByteBuffer.wrap(this.bytes);
                        } else {
                            if (this.compacted == this.vals.size()) break;
                            this.readAndCompact();
                            ++idx;
                            continue;
                        }
                    }
                    this.beg = v.pos;
                    this.vals.addFirst(v);
                    s.cas(v, null);
                }
                if (this.vals.isEmpty()) {
                    SwapFile.this.arr.truncate(1);
                    SwapFile.this.writeCh.truncate(0L);
                    SwapFile.this.len = 0L;
                    SwapFile.this.reopenReadChannel();
                    return;
                }
                if (this.compacted != this.vals.size()) {
                    this.readAndCompact();
                }
                int pos = 0;
                for (SwapValue val : this.vals) {
                    val.set(pos, this.bytes);
                    pos += val.len;
                }
                this.buf.flip();
                assert (this.buf.limit() == pos) : this.buf.limit() + " " + pos;
                SwapFile.this.arr.truncate(idx + 1);
                if (SwapFile.this.len - this.beg > 0xA00000L) {
                    SwapFile.this.writeCh.truncate(this.beg);
                    SwapFile.this.len = this.beg;
                }
            }

            public ByteBuffer result() {
                return this.buf;
            }
        }
    }

    private static class StripedFileChannel {
        private final AtomicInteger enter = new AtomicInteger();
        private final RandomAccessFile[] rafs;
        private final FileChannel[] chs;

        StripedFileChannel(File f, int stripes) throws FileNotFoundException {
            assert (stripes > 0 && (stripes & stripes - 1) == 0) : "stripes must be positive and power of two.";
            this.rafs = new RandomAccessFile[stripes];
            this.chs = new FileChannel[stripes];
            for (int i = 0; i < stripes; ++i) {
                RandomAccessFile raf;
                this.rafs[i] = raf = new RandomAccessFile(f, "r");
                this.chs[i] = raf.getChannel();
            }
        }

        int read(ByteBuffer buf, long pos) throws IOException {
            int i = this.enter.getAndIncrement() & this.chs.length - 1;
            return this.chs[i].read(buf, pos);
        }

        void close() {
            for (RandomAccessFile raf : this.rafs) {
                U.closeQuiet(raf);
            }
        }
    }

    static class SwapValues {
        private final SwapValue[] vals;
        private final int size;

        SwapValues(SwapValue[] vals, int size) {
            this.vals = vals;
            this.size = size;
        }
    }

    private static class SwapValuesQueue {
        private final ArrayDeque<SwapValue> deq = new ArrayDeque();
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition mayAdd = this.lock.newCondition();
        private final Condition mayTake = this.lock.newCondition();
        private int size;
        private final int minTakeSize;
        private final int maxSize;
        private final IgniteLogger log;
        private boolean queueSizeWarn;

        private SwapValuesQueue(int minTakeSize, int maxSize, IgniteLogger log) {
            this.minTakeSize = minTakeSize;
            this.maxSize = maxSize;
            this.log = log;
        }

        public void add(SwapValue val) throws IgniteSpiException {
            this.lock.lock();
            try {
                boolean largeVal;
                boolean bl = largeVal = val.len > this.maxSize;
                if (largeVal) {
                    if (!this.queueSizeWarn) {
                        U.warn(this.log, "Trying to save in swap entry which have size more than write queue size. You may wish to increase 'maxWriteQueueSize' in FileSwapSpaceSpi configuration [queueMaxSize=" + this.maxSize + ", valSize=" + val.len + ']');
                        this.queueSizeWarn = true;
                    }
                    while (this.size >= this.minTakeSize) {
                        this.mayAdd.await();
                    }
                } else {
                    while (this.size + val.len > this.maxSize) {
                        this.mayAdd.await();
                    }
                }
                this.size += val.len;
                this.deq.addLast(val);
                if (this.size >= this.minTakeSize) {
                    this.mayTake.signalAll();
                }
            }
            catch (InterruptedException e) {
                throw new IgniteSpiException(e);
            }
            finally {
                this.lock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SwapValues take() throws InterruptedException {
            this.lock.lock();
            try {
                while (this.size < this.minTakeSize) {
                    this.mayTake.await();
                }
                int size = 0;
                int cnt = 0;
                for (SwapValue val : this.deq) {
                    ++cnt;
                    if ((size += val.len) < this.minTakeSize) continue;
                    break;
                }
                SwapValue[] vals = new SwapValue[cnt];
                for (int i = 0; i < cnt; ++i) {
                    SwapValue val;
                    vals[i] = val = this.deq.pollFirst();
                }
                if ((this.size -= size) < this.maxSize) {
                    this.mayAdd.signalAll();
                }
                SwapValues swapValues = new SwapValues(vals, size);
                return swapValues;
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    static class SwapValue {
        private static final int NEW = 0;
        private static final int DELETED = Integer.MIN_VALUE;
        private static final AtomicIntegerFieldUpdater<SwapValue> idxUpdater = AtomicIntegerFieldUpdater.newUpdater(SwapValue.class, "idx");
        private byte[] val;
        private final int len;
        private long pos = -1L;
        private volatile int idx;

        SwapValue(byte[] val) {
            assert (val != null);
            this.val = val;
            this.len = val.length;
        }

        @Nullable
        public synchronized byte[] value(Space space) throws IgniteSpiException {
            byte[] v = this.val;
            if (v == null) {
                int i = this.idx;
                assert (i != 0);
                if (i != Integer.MIN_VALUE) {
                    StripedFileChannel ch;
                    StripedFileChannel stripedFileChannel = ch = i < 0 ? ((Space)space).left.readCh : ((Space)space).right.readCh;
                    if (this.idx != Integer.MIN_VALUE) {
                        v = this.readValue(ch);
                    }
                }
            } else if (v.length != this.len) {
                int p = (int)this.pos;
                v = Arrays.copyOfRange(v, p, p + this.len);
            }
            return v;
        }

        @Nullable
        byte[] readValue(StripedFileChannel ch) throws IgniteSpiException {
            byte[] v = new byte[this.len];
            int res = 0;
            try {
                res = ch.read(ByteBuffer.wrap(v), this.pos);
            }
            catch (ClosedByInterruptException e) {
                throw new IgniteSpiException("Operation was interrupted.", e);
            }
            catch (AsynchronousCloseException ignore) {
                assert (this.idx == Integer.MIN_VALUE);
            }
            catch (ClosedChannelException e) {
                throw new IgniteSpiException("File channel was unexpectedly closed.", e);
            }
            catch (IOException e) {
                throw new IgniteSpiException("Failed to read value.", e);
            }
            if (res < this.len) {
                return null;
            }
            return v;
        }

        public synchronized void set(long pos, byte[] val) {
            if (pos != -1L) {
                this.pos = pos;
            }
            this.val = val;
        }

        public boolean casIdx(int exp, int idx) {
            return idxUpdater.compareAndSet(this, exp, idx);
        }

        int idx() {
            return this.idx;
        }

        public String toString() {
            return this.pos + " " + this.len;
        }
    }
}

