/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.shortcircuit;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.HashMultimap;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShm;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestShortCircuitCache {
    static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);

    @Test(timeout=60000L)
    public void testCreateAndDestroy() throws Exception {
        ShortCircuitCache cache = new ShortCircuitCache(10, 1L, 10, 1L, 1L, 10000L, 0);
        cache.close();
    }

    @Test(timeout=60000L)
    public void testAddAndRetrieve() throws Exception {
        ShortCircuitCache cache = new ShortCircuitCache(10, 10000000L, 10, 10000000L, 1L, 10000L, 0);
        TestFileDescriptorPair pair = new TestFileDescriptorPair();
        ShortCircuitReplicaInfo replicaInfo1 = cache.fetchOrCreate(new ExtendedBlockId(123L, "test_bp1"), (ShortCircuitCache.ShortCircuitReplicaCreator)new SimpleReplicaCreator(123, cache, pair));
        Preconditions.checkNotNull((Object)replicaInfo1.getReplica());
        Preconditions.checkState((replicaInfo1.getInvalidTokenException() == null ? 1 : 0) != 0);
        pair.compareWith(replicaInfo1.getReplica().getDataStream(), replicaInfo1.getReplica().getMetaStream());
        ShortCircuitReplicaInfo replicaInfo2 = cache.fetchOrCreate(new ExtendedBlockId(123L, "test_bp1"), new ShortCircuitCache.ShortCircuitReplicaCreator(){

            public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                Assert.fail((String)"expected to use existing entry.");
                return null;
            }
        });
        Preconditions.checkNotNull((Object)replicaInfo2.getReplica());
        Preconditions.checkState((replicaInfo2.getInvalidTokenException() == null ? 1 : 0) != 0);
        Preconditions.checkState((replicaInfo1 == replicaInfo2 ? 1 : 0) != 0);
        pair.compareWith(replicaInfo2.getReplica().getDataStream(), replicaInfo2.getReplica().getMetaStream());
        replicaInfo1.getReplica().unref();
        replicaInfo2.getReplica().unref();
        ShortCircuitReplicaInfo replicaInfo3 = cache.fetchOrCreate(new ExtendedBlockId(123L, "test_bp1"), new ShortCircuitCache.ShortCircuitReplicaCreator(){

            public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                Assert.fail((String)"expected to use existing entry.");
                return null;
            }
        });
        Preconditions.checkNotNull((Object)replicaInfo3.getReplica());
        Preconditions.checkState((replicaInfo3.getInvalidTokenException() == null ? 1 : 0) != 0);
        replicaInfo3.getReplica().unref();
        pair.close();
        cache.close();
    }

    @Test(timeout=60000L)
    public void testExpiry() throws Exception {
        ShortCircuitCache cache = new ShortCircuitCache(2, 1L, 1, 10000000L, 1L, 10000000L, 0);
        TestFileDescriptorPair pair = new TestFileDescriptorPair();
        ShortCircuitReplicaInfo replicaInfo1 = cache.fetchOrCreate(new ExtendedBlockId(123L, "test_bp1"), (ShortCircuitCache.ShortCircuitReplicaCreator)new SimpleReplicaCreator(123, cache, pair));
        Preconditions.checkNotNull((Object)replicaInfo1.getReplica());
        Preconditions.checkState((replicaInfo1.getInvalidTokenException() == null ? 1 : 0) != 0);
        pair.compareWith(replicaInfo1.getReplica().getDataStream(), replicaInfo1.getReplica().getMetaStream());
        replicaInfo1.getReplica().unref();
        final MutableBoolean triedToCreate = new MutableBoolean(false);
        do {
            Thread.sleep(10L);
            ShortCircuitReplicaInfo replicaInfo2 = cache.fetchOrCreate(new ExtendedBlockId(123L, "test_bp1"), new ShortCircuitCache.ShortCircuitReplicaCreator(){

                public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                    triedToCreate.setValue(true);
                    return null;
                }
            });
            if (replicaInfo2 == null || replicaInfo2.getReplica() == null) continue;
            replicaInfo2.getReplica().unref();
        } while (triedToCreate.isFalse());
        cache.close();
    }

    @Test(timeout=60000L)
    public void testEviction() throws Exception {
        int i;
        int i2;
        ShortCircuitCache cache = new ShortCircuitCache(2, 10000000L, 1, 10000000L, 1L, 10000L, 0);
        TestFileDescriptorPair[] pairs = new TestFileDescriptorPair[]{new TestFileDescriptorPair(), new TestFileDescriptorPair(), new TestFileDescriptorPair()};
        ShortCircuitReplicaInfo[] replicaInfos = new ShortCircuitReplicaInfo[]{null, null, null};
        for (i2 = 0; i2 < pairs.length; ++i2) {
            replicaInfos[i2] = cache.fetchOrCreate(new ExtendedBlockId((long)i2, "test_bp1"), (ShortCircuitCache.ShortCircuitReplicaCreator)new SimpleReplicaCreator(i2, cache, pairs[i2]));
            Preconditions.checkNotNull((Object)replicaInfos[i2].getReplica());
            Preconditions.checkState((replicaInfos[i2].getInvalidTokenException() == null ? 1 : 0) != 0);
            pairs[i2].compareWith(replicaInfos[i2].getReplica().getDataStream(), replicaInfos[i2].getReplica().getMetaStream());
        }
        for (i2 = 0; i2 < pairs.length; ++i2) {
            replicaInfos[i2].getReplica().unref();
        }
        for (i2 = 1; i2 < pairs.length; ++i2) {
            final Integer iVal = new Integer(i2);
            replicaInfos[i2] = cache.fetchOrCreate(new ExtendedBlockId((long)i2, "test_bp1"), new ShortCircuitCache.ShortCircuitReplicaCreator(){

                public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                    Assert.fail((String)("expected to use existing entry for " + iVal));
                    return null;
                }
            });
            Preconditions.checkNotNull((Object)replicaInfos[i2].getReplica());
            Preconditions.checkState((replicaInfos[i2].getInvalidTokenException() == null ? 1 : 0) != 0);
            pairs[i2].compareWith(replicaInfos[i2].getReplica().getDataStream(), replicaInfos[i2].getReplica().getMetaStream());
        }
        final MutableBoolean calledCreate = new MutableBoolean(false);
        replicaInfos[0] = cache.fetchOrCreate(new ExtendedBlockId(0L, "test_bp1"), new ShortCircuitCache.ShortCircuitReplicaCreator(){

            public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                calledCreate.setValue(true);
                return null;
            }
        });
        Preconditions.checkState((replicaInfos[0].getReplica() == null ? 1 : 0) != 0);
        Assert.assertTrue((boolean)calledCreate.isTrue());
        for (i = 1; i < pairs.length; ++i) {
            replicaInfos[i].getReplica().unref();
        }
        for (i = 0; i < pairs.length; ++i) {
            pairs[i].close();
        }
        cache.close();
    }

    @Test(timeout=60000L)
    public void testTimeBasedStaleness() throws Exception {
        final ShortCircuitCache cache = new ShortCircuitCache(2, 10000000L, 1, 10000000L, 1L, 10L, 0);
        final TestFileDescriptorPair[] pairs = new TestFileDescriptorPair[]{new TestFileDescriptorPair(), new TestFileDescriptorPair()};
        ShortCircuitReplicaInfo[] replicaInfos = new ShortCircuitReplicaInfo[]{null, null};
        long HOUR_IN_MS = 3600000L;
        for (int i = 0; i < pairs.length; ++i) {
            final Integer iVal = new Integer(i);
            final ExtendedBlockId key = new ExtendedBlockId((long)i, "test_bp1");
            replicaInfos[i] = cache.fetchOrCreate(key, new ShortCircuitCache.ShortCircuitReplicaCreator(){

                public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                    try {
                        return new ShortCircuitReplicaInfo(new ShortCircuitReplica(key, pairs[iVal].getFileInputStreams()[0], pairs[iVal].getFileInputStreams()[1], cache, Time.monotonicNow() + (long)iVal.intValue() * 3600000L, null));
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            Preconditions.checkNotNull((Object)replicaInfos[i].getReplica());
            Preconditions.checkState((replicaInfos[i].getInvalidTokenException() == null ? 1 : 0) != 0);
            pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(), replicaInfos[i].getReplica().getMetaStream());
        }
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                ShortCircuitReplicaInfo info = cache.fetchOrCreate(new ExtendedBlockId(0L, "test_bp1"), new ShortCircuitCache.ShortCircuitReplicaCreator(){

                    public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                        return null;
                    }
                });
                if (info.getReplica() != null) {
                    info.getReplica().unref();
                    return false;
                }
                return true;
            }
        }, (int)500, (int)60000);
        ShortCircuitReplicaInfo info = cache.fetchOrCreate(new ExtendedBlockId(1L, "test_bp1"), new ShortCircuitCache.ShortCircuitReplicaCreator(){

            public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                Assert.fail((String)"second replica went stale, despite 1 hour staleness time.");
                return null;
            }
        });
        info.getReplica().unref();
        for (int i = 1; i < pairs.length; ++i) {
            replicaInfos[i].getReplica().unref();
        }
        cache.close();
    }

    private static Configuration createShortCircuitConf(String testName, TemporarySocketDirectory sockDir) {
        Configuration conf = new Configuration();
        conf.set("dfs.client.context", testName);
        conf.setLong("dfs.blocksize", 4096L);
        conf.set("dfs.domain.socket.path", new File(sockDir.getDir(), testName).getAbsolutePath());
        conf.setBoolean("dfs.client.read.shortcircuit", true);
        conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
        conf.setBoolean("dfs.client.domain.socket.data.traffic", false);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        DomainSocket.disableBindPathValidation();
        Assume.assumeThat((Object)DomainSocket.getLoadingFailureReason(), (Matcher)CoreMatchers.equalTo(null));
        return conf;
    }

    private static DomainPeer getDomainPeerToDn(Configuration conf) throws IOException {
        DomainSocket sock = DomainSocket.connect((String)conf.get("dfs.domain.socket.path"));
        return new DomainPeer(sock);
    }

    @Test(timeout=60000L)
    public void testAllocShm() throws Exception {
        BlockReaderTestUtil.enableShortCircuitShmTracing();
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestShortCircuitCache.createShortCircuitConf("testAllocShm", sockDir);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        DistributedFileSystem fs = cluster.getFileSystem();
        final ShortCircuitCache cache = fs.getClient().getClientContext().getShortCircuitCache();
        cache.getDfsClientShmManager().visit(new DfsClientShmManager.Visitor(){

            public void visit(HashMap<DatanodeInfo, DfsClientShmManager.PerDatanodeVisitorInfo> info) throws IOException {
                Assert.assertEquals((long)0L, (long)info.size());
            }
        });
        DomainPeer peer = TestShortCircuitCache.getDomainPeerToDn(conf);
        MutableBoolean usedPeer = new MutableBoolean(false);
        ExtendedBlockId blockId = new ExtendedBlockId(123L, "xyz");
        final DatanodeInfo datanode = new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
        ShortCircuitShm.Slot slot = cache.allocShmSlot(datanode, peer, usedPeer, blockId, "testAllocShm_client");
        Assert.assertNotNull((Object)slot);
        Assert.assertTrue((boolean)usedPeer.booleanValue());
        cache.getDfsClientShmManager().visit(new DfsClientShmManager.Visitor(){

            public void visit(HashMap<DatanodeInfo, DfsClientShmManager.PerDatanodeVisitorInfo> info) throws IOException {
                Assert.assertEquals((long)1L, (long)info.size());
                DfsClientShmManager.PerDatanodeVisitorInfo vinfo = info.get(datanode);
                Assert.assertFalse((boolean)vinfo.disabled);
                Assert.assertEquals((long)0L, (long)vinfo.full.size());
                Assert.assertEquals((long)1L, (long)vinfo.notFull.size());
            }
        });
        cache.scheduleSlotReleaser(slot);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                final MutableBoolean done = new MutableBoolean(false);
                try {
                    cache.getDfsClientShmManager().visit(new DfsClientShmManager.Visitor(){

                        public void visit(HashMap<DatanodeInfo, DfsClientShmManager.PerDatanodeVisitorInfo> info) throws IOException {
                            done.setValue(info.get((Object)datanode).full.isEmpty() && info.get((Object)datanode).notFull.isEmpty());
                        }
                    });
                }
                catch (IOException e) {
                    LOG.error((Object)"error running visitor", (Throwable)e);
                }
                return done.booleanValue();
            }
        }, (int)10, (int)60000);
        cluster.shutdown();
        sockDir.close();
    }

    @Test(timeout=60000L)
    public void testShmBasedStaleness() throws Exception {
        BlockReaderTestUtil.enableShortCircuitShmTracing();
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestShortCircuitCache.createShortCircuitConf("testShmBasedStaleness", sockDir);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        DistributedFileSystem fs = cluster.getFileSystem();
        ShortCircuitCache cache = fs.getClient().getClientContext().getShortCircuitCache();
        String TEST_FILE = "/test_file";
        int TEST_FILE_LEN = 8193;
        int SEED = 1027565;
        DFSTestUtil.createFile((FileSystem)fs, new Path(TEST_FILE), 8193L, (short)1, 1027565L);
        FSDataInputStream fis = fs.open(new Path(TEST_FILE));
        int first = fis.read();
        final ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, new Path(TEST_FILE));
        Assert.assertTrue((first != -1 ? 1 : 0) != 0);
        cache.accept(new ShortCircuitCache.CacheVisitor(){

            public void visit(int numOutstandingMmaps, Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, SecretManager.InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped) {
                ShortCircuitReplica replica = replicas.get(ExtendedBlockId.fromExtendedBlock((ExtendedBlock)block));
                Assert.assertNotNull((Object)replica);
                Assert.assertTrue((boolean)replica.getSlot().isValid());
            }
        });
        cluster.getDataNodes().get(0).shutdown();
        cache.accept(new ShortCircuitCache.CacheVisitor(){

            public void visit(int numOutstandingMmaps, Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, SecretManager.InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped) {
                ShortCircuitReplica replica = replicas.get(ExtendedBlockId.fromExtendedBlock((ExtendedBlock)block));
                Assert.assertNotNull((Object)replica);
                Assert.assertFalse((boolean)replica.getSlot().isValid());
            }
        });
        cluster.shutdown();
        sockDir.close();
    }

    @Test(timeout=60000L)
    public void testUnlinkingReplicasInFileDescriptorCache() throws Exception {
        BlockReaderTestUtil.enableShortCircuitShmTracing();
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestShortCircuitCache.createShortCircuitConf("testUnlinkingReplicasInFileDescriptorCache", sockDir);
        conf.setLong("dfs.client.read.shortcircuit.streams.cache.expiry.ms", 1000000000L);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        DistributedFileSystem fs = cluster.getFileSystem();
        final ShortCircuitCache cache = fs.getClient().getClientContext().getShortCircuitCache();
        cache.getDfsClientShmManager().visit(new DfsClientShmManager.Visitor(){

            public void visit(HashMap<DatanodeInfo, DfsClientShmManager.PerDatanodeVisitorInfo> info) throws IOException {
                Assert.assertEquals((long)0L, (long)info.size());
            }
        });
        Path TEST_PATH = new Path("/test_file");
        int TEST_FILE_LEN = 8193;
        int SEED = 1027552;
        DFSTestUtil.createFile((FileSystem)fs, TEST_PATH, 8193L, (short)1, 1027552L);
        byte[] contents = DFSTestUtil.readFileBuffer((FileSystem)fs, TEST_PATH);
        byte[] expected = DFSTestUtil.calculateFileContentsFromSeed(1027552L, 8193);
        Assert.assertTrue((boolean)Arrays.equals(contents, expected));
        final DatanodeInfo datanode = new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
        cache.getDfsClientShmManager().visit(new DfsClientShmManager.Visitor(){

            public void visit(HashMap<DatanodeInfo, DfsClientShmManager.PerDatanodeVisitorInfo> info) throws IOException {
                Assert.assertTrue((boolean)info.get((Object)datanode).full.isEmpty());
                Assert.assertFalse((boolean)info.get((Object)datanode).disabled);
                Assert.assertEquals((long)1L, (long)info.get((Object)datanode).notFull.values().size());
                DfsClientShm shm = (DfsClientShm)info.get((Object)datanode).notFull.values().iterator().next();
                Assert.assertFalse((boolean)shm.isDisconnected());
            }
        });
        fs.delete(TEST_PATH, false);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){
            MutableBoolean done = new MutableBoolean(true);

            public Boolean get() {
                try {
                    this.done.setValue(true);
                    cache.getDfsClientShmManager().visit(new DfsClientShmManager.Visitor(){

                        public void visit(HashMap<DatanodeInfo, DfsClientShmManager.PerDatanodeVisitorInfo> info) throws IOException {
                            Assert.assertTrue((boolean)info.get((Object)datanode).full.isEmpty());
                            Assert.assertFalse((boolean)info.get((Object)datanode).disabled);
                            Assert.assertEquals((long)1L, (long)info.get((Object)datanode).notFull.values().size());
                            DfsClientShm shm = (DfsClientShm)info.get((Object)datanode).notFull.values().iterator().next();
                            ShortCircuitShm.SlotIterator iter = shm.slotIterator();
                            while (iter.hasNext()) {
                                ShortCircuitShm.Slot slot = (ShortCircuitShm.Slot)iter.next();
                                if (!slot.isValid()) continue;
                                done.setValue(false);
                            }
                        }
                    });
                }
                catch (IOException e) {
                    LOG.error((Object)"error running visitor", (Throwable)e);
                }
                return this.done.booleanValue();
            }
        }, (int)10, (int)60000);
        cluster.shutdown();
        sockDir.close();
    }

    private static void checkNumberOfSegmentsAndSlots(final int expectedSegments, final int expectedSlots, final ShortCircuitRegistry registry) throws InterruptedException, TimeoutException {
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                return registry.visit(new ShortCircuitRegistry.Visitor(){

                    public boolean accept(HashMap<ShortCircuitShm.ShmId, ShortCircuitRegistry.RegisteredShm> segments, HashMultimap<ExtendedBlockId, ShortCircuitShm.Slot> slots) {
                        return expectedSegments == segments.size() && expectedSlots == slots.size();
                    }
                });
            }
        }, (int)100, (int)10000);
    }

    @Test(timeout=60000L)
    public void testDataXceiverCleansUpSlotsOnFailure() throws Exception {
        BlockReaderTestUtil.enableShortCircuitShmTracing();
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestShortCircuitCache.createShortCircuitConf("testDataXceiverCleansUpSlotsOnFailure", sockDir);
        conf.setLong("dfs.client.read.shortcircuit.streams.cache.expiry.ms", 1000000000L);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        DistributedFileSystem fs = cluster.getFileSystem();
        Path TEST_PATH1 = new Path("/test_file1");
        Path TEST_PATH2 = new Path("/test_file2");
        int TEST_FILE_LEN = 4096;
        int SEED = 1027553;
        DFSTestUtil.createFile((FileSystem)fs, TEST_PATH1, 4096L, (short)1, 1027553L);
        DFSTestUtil.createFile((FileSystem)fs, TEST_PATH2, 4096L, (short)1, 1027553L);
        DFSTestUtil.readFileBuffer((FileSystem)fs, TEST_PATH1);
        fs.getClient().getConf().brfFailureInjector = new TestCleanupFailureInjector();
        try {
            DFSTestUtil.readFileBuffer((FileSystem)fs, TEST_PATH2);
        }
        catch (Throwable t) {
            GenericTestUtils.assertExceptionContains((String)"TCP reads were disabled for testing, but we failed to do a non-TCP read.", (Throwable)t);
        }
        TestShortCircuitCache.checkNumberOfSegmentsAndSlots(1, 1, cluster.getDataNodes().get(0).getShortCircuitRegistry());
        cluster.shutdown();
        sockDir.close();
    }

    @Test(timeout=60000L)
    public void testDataXceiverHandlesRequestShortCircuitShmFailure() throws Exception {
        BlockReaderTestUtil.enableShortCircuitShmTracing();
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestShortCircuitCache.createShortCircuitConf("testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir);
        conf.setLong("dfs.client.read.shortcircuit.streams.cache.expiry.ms", 1000000000L);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        DistributedFileSystem fs = cluster.getFileSystem();
        Path TEST_PATH1 = new Path("/test_file1");
        DFSTestUtil.createFile((FileSystem)fs, TEST_PATH1, 4096L, (short)1, 1027553L);
        LOG.info((Object)"Setting failure injector and performing a read which should fail...");
        DataNodeFaultInjector failureInjector = (DataNodeFaultInjector)Mockito.mock(DataNodeFaultInjector.class);
        ((DataNodeFaultInjector)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                throw new IOException("injected error into sendShmResponse");
            }
        }).when((Object)failureInjector)).sendShortCircuitShmResponse();
        DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance;
        DataNodeFaultInjector.instance = failureInjector;
        try {
            DFSTestUtil.readFileBuffer((FileSystem)fs, TEST_PATH1);
            Assert.fail((String)"expected readFileBuffer to fail, but it succeeded.");
        }
        catch (Throwable t) {
            GenericTestUtils.assertExceptionContains((String)"TCP reads were disabled for testing, but we failed to do a non-TCP read.", (Throwable)t);
        }
        TestShortCircuitCache.checkNumberOfSegmentsAndSlots(0, 0, cluster.getDataNodes().get(0).getShortCircuitRegistry());
        LOG.info((Object)"Clearing failure injector and performing another read...");
        DataNodeFaultInjector.instance = prevInjector;
        fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();
        DFSTestUtil.readFileBuffer((FileSystem)fs, TEST_PATH1);
        TestShortCircuitCache.checkNumberOfSegmentsAndSlots(1, 1, cluster.getDataNodes().get(0).getShortCircuitRegistry());
        cluster.shutdown();
        sockDir.close();
    }

    @Test(timeout=60000L)
    public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception {
        BlockReaderTestUtil.enableShortCircuitShmTracing();
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestShortCircuitCache.createShortCircuitConf("testPreReceiptVerificationDfsClientCanDoScr", sockDir);
        conf.setLong("dfs.client.read.shortcircuit.streams.cache.expiry.ms", 1000000000L);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        DistributedFileSystem fs = cluster.getFileSystem();
        fs.getClient().getConf().brfFailureInjector = new TestPreReceiptVerificationFailureInjector();
        Path TEST_PATH1 = new Path("/test_file1");
        DFSTestUtil.createFile((FileSystem)fs, TEST_PATH1, 4096L, (short)1, 1027554L);
        Path TEST_PATH2 = new Path("/test_file2");
        DFSTestUtil.createFile((FileSystem)fs, TEST_PATH2, 4096L, (short)1, 1027554L);
        DFSTestUtil.readFileBuffer((FileSystem)fs, TEST_PATH1);
        DFSTestUtil.readFileBuffer((FileSystem)fs, TEST_PATH2);
        TestShortCircuitCache.checkNumberOfSegmentsAndSlots(1, 2, cluster.getDataNodes().get(0).getShortCircuitRegistry());
        cluster.shutdown();
        sockDir.close();
    }

    public static class TestPreReceiptVerificationFailureInjector
    extends BlockReaderFactory.FailureInjector {
        public boolean getSupportsReceiptVerification() {
            return false;
        }
    }

    public static class TestCleanupFailureInjector
    extends BlockReaderFactory.FailureInjector {
        public void injectRequestFileDescriptorsFailure() throws IOException {
            throw new IOException("injected I/O error");
        }
    }

    private static class SimpleReplicaCreator
    implements ShortCircuitCache.ShortCircuitReplicaCreator {
        private final int blockId;
        private final ShortCircuitCache cache;
        private final TestFileDescriptorPair pair;

        SimpleReplicaCreator(int blockId, ShortCircuitCache cache, TestFileDescriptorPair pair) {
            this.blockId = blockId;
            this.cache = cache;
            this.pair = pair;
        }

        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
            try {
                ExtendedBlockId key = new ExtendedBlockId((long)this.blockId, "test_bp1");
                return new ShortCircuitReplicaInfo(new ShortCircuitReplica(key, this.pair.getFileInputStreams()[0], this.pair.getFileInputStreams()[1], this.cache, Time.monotonicNow(), null));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class TestFileDescriptorPair {
        final TemporarySocketDirectory dir = new TemporarySocketDirectory();
        final FileInputStream[] fis = new FileInputStream[2];

        public TestFileDescriptorPair() throws IOException {
            for (int i = 0; i < 2; ++i) {
                String name = this.dir.getDir() + "/file" + i;
                FileOutputStream fos = new FileOutputStream(name);
                if (i == 0) {
                    fos.write(1);
                } else {
                    BlockMetadataHeader header = new BlockMetadataHeader(1, DataChecksum.newDataChecksum((DataChecksum.Type)DataChecksum.Type.NULL, (int)4));
                    DataOutputStream dos = new DataOutputStream(fos);
                    BlockMetadataHeader.writeHeader((DataOutputStream)dos, (BlockMetadataHeader)header);
                    dos.close();
                }
                fos.close();
                this.fis[i] = new FileInputStream(name);
            }
        }

        public FileInputStream[] getFileInputStreams() {
            return this.fis;
        }

        public void close() throws IOException {
            IOUtils.cleanup((Log)LOG, (Closeable[])this.fis);
            this.dir.close();
        }

        public boolean compareWith(FileInputStream data, FileInputStream meta) {
            return data == this.fis[0] && meta == this.fis[1];
        }
    }
}

