package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.class */
public class TestReplaceDatanodeFailureReplication {
    static final Log LOG = LogFactory.getLog(TestReplaceDatanodeFailureReplication.class);
    static final String DIR = "/" + TestReplaceDatanodeFailureReplication.class.getSimpleName() + "/";
    static final short REPLICATION = 3;
    private static final String RACK0 = "/rack0";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication$SlowWriter.class */
    public static class SlowWriter extends Thread {
        private final Path filepath;
        private final HdfsDataOutputStream out;
        private final long sleepms;
        private volatile boolean running;

        SlowWriter(DistributedFileSystem distributedFileSystem, Path path, long j) throws IOException {
            super(SlowWriter.class.getSimpleName() + ":" + path);
            this.running = true;
            this.filepath = path;
            this.out = distributedFileSystem.create(path, (short) 3);
            this.sleepms = j;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int i = 0;
            try {
                try {
                    sleep(this.sleepms);
                    while (this.running) {
                        TestReplaceDatanodeFailureReplication.LOG.info(getName() + " writes " + i);
                        this.out.write(i);
                        this.out.hflush();
                        sleep(this.sleepms);
                        i++;
                    }
                    TestReplaceDatanodeFailureReplication.LOG.info(getName() + " terminated: i=" + i);
                } catch (IOException e) {
                    throw new RuntimeException(getName(), e);
                } catch (InterruptedException e2) {
                    TestReplaceDatanodeFailureReplication.LOG.info(getName() + " interrupted:" + e2);
                    TestReplaceDatanodeFailureReplication.LOG.info(getName() + " terminated: i=" + i);
                }
            } catch (Throwable th) {
                TestReplaceDatanodeFailureReplication.LOG.info(getName() + " terminated: i=" + i);
                throw th;
            }
        }

        void interruptRunning() {
            this.running = false;
            interrupt();
        }

        void joinAndClose() throws InterruptedException {
            TestReplaceDatanodeFailureReplication.LOG.info(getName() + " join and close");
            join();
            IOUtils.closeStream(this.out);
        }
    }

    @Test
    public void testLastDatanodeFailureInPipeline() throws Exception {
        testWriteFileAndVerifyAfterDNStop(2, 1, 10, false);
    }

    @Test
    public void testFirstDatanodeFailureInPipeline() throws Exception {
        testWriteFileAndVerifyAfterDNStop(2, 0, 10, false);
    }

    @Test
    public void testWithOnlyFirstDatanodeIsAlive() throws Exception {
        testWriteFileAndVerifyAfterDNStop(1, 1, 1, true);
    }

    @Test
    public void testWithOnlyLastDatanodeIsAlive() throws Exception {
        testWriteFileAndVerifyAfterDNStop(1, 0, 1, true);
    }

    @Test
    public void testLessNumberOfLiveDatanodesThanWriteReplaceDatanodeOnFailureRF() throws Exception {
        MiniDFSCluster miniDFSCluster = setupCluster(2);
        try {
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            Path path = new Path(DIR);
            SlowWriter[] slowWriterArr = new SlowWriter[1];
            for (int i = 1; i <= slowWriterArr.length; i++) {
                slowWriterArr[i - 1] = new SlowWriter(fileSystem, new Path(path, "file" + i), i * 200);
            }
            for (SlowWriter slowWriter : slowWriterArr) {
                slowWriter.start();
            }
            sleepSeconds(1);
            miniDFSCluster.stopDataNode(0);
            miniDFSCluster.stopDataNode(0);
            sleepSeconds(20);
            for (SlowWriter slowWriter2 : slowWriterArr) {
                try {
                    slowWriter2.out.getCurrentBlockReplication();
                    Assert.fail("Must throw exception as failed to add a new datanode for write pipeline, minimum failure replication");
                } catch (IOException e) {
                }
                slowWriter2.interruptRunning();
            }
            for (SlowWriter slowWriter3 : slowWriterArr) {
                slowWriter3.joinAndClose();
            }
            verifyFileContent(fileSystem, slowWriterArr);
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    private MiniDFSCluster setupCluster(int i) throws IOException {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setInt("dfs.client.block.write.replace-datanode-on-failure.min-replication", i);
        ReplaceDatanodeOnFailure.write(ReplaceDatanodeOnFailure.Policy.ALWAYS, false, hdfsConfiguration);
        String[] strArr = new String[REPLICATION];
        Arrays.fill(strArr, RACK0);
        return new MiniDFSCluster.Builder(hdfsConfiguration).racks(strArr).numDataNodes(REPLICATION).build();
    }

    private void testWriteFileAndVerifyAfterDNStop(int i, int i2, int i3, boolean z) throws IOException, InterruptedException, TimeoutException {
        MiniDFSCluster miniDFSCluster = setupCluster(i);
        try {
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            Path path = new Path(DIR);
            SlowWriter[] slowWriterArr = new SlowWriter[i3];
            for (int i4 = 1; i4 <= slowWriterArr.length; i4++) {
                slowWriterArr[i4 - 1] = new SlowWriter(fileSystem, new Path(path, "file" + i4), i4 * 200);
            }
            for (SlowWriter slowWriter : slowWriterArr) {
                slowWriter.start();
            }
            sleepSeconds(REPLICATION);
            miniDFSCluster.stopDataNode(i2);
            if (z) {
                miniDFSCluster.stopDataNode(i2);
            }
            sleepSeconds(5);
            miniDFSCluster.waitFirstBRCompleted(0, 10000);
            for (SlowWriter slowWriter2 : slowWriterArr) {
                Assert.assertEquals(i, slowWriter2.out.getCurrentBlockReplication());
                slowWriter2.interruptRunning();
            }
            for (SlowWriter slowWriter3 : slowWriterArr) {
                slowWriter3.joinAndClose();
            }
            verifyFileContent(fileSystem, slowWriterArr);
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    private void verifyFileContent(DistributedFileSystem distributedFileSystem, SlowWriter[] slowWriterArr) throws IOException {
        LOG.info("Verify the file");
        if (0 >= slowWriterArr.length) {
            return;
        }
        LOG.info(slowWriterArr[0].filepath + ": length=" + distributedFileSystem.getFileStatus(slowWriterArr[0].filepath).getLen());
        FSDataInputStream fSDataInputStream = null;
        try {
            fSDataInputStream = distributedFileSystem.open(slowWriterArr[0].filepath);
            int i = 0;
            while (true) {
                int read = fSDataInputStream.read();
                if (read == -1) {
                    IOUtils.closeStream(fSDataInputStream);
                    return;
                } else {
                    Assert.assertEquals(i, read);
                    i++;
                }
            }
        } catch (Throwable th) {
            IOUtils.closeStream(fSDataInputStream);
            throw th;
        }
    }

    static void sleepSeconds(int i) throws InterruptedException {
        LOG.info("Wait " + i + " seconds");
        Thread.sleep(i * 1000);
    }
}
