package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchResult;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback;
import org.apache.tez.runtime.library.common.shuffle.Fetcher;
import org.apache.tez.runtime.library.common.shuffle.InputHost;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.class */
public class TestShuffleManager {
    private static final String FETCHER_HOST = "localhost";
    private static final int PORT = 8080;
    private static final String PATH_COMPONENT = "attempttmp";
    private final Configuration conf = new Configuration();
    private TezExecutors sharedExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager$ShuffleManagerForTest.class */
    public static class ShuffleManagerForTest extends ShuffleManager {
        public ShuffleManagerForTest(InputContext inputContext, Configuration configuration, int i, int i2, boolean z, int i3, CompressionCodec compressionCodec, FetchedInputAllocator fetchedInputAllocator) throws IOException {
            super(inputContext, configuration, i, i2, z, i3, compressionCodec, fetchedInputAllocator);
        }

        Fetcher constructFetcherForHost(InputHost inputHost, Configuration configuration) {
            final Fetcher fetcher = (Fetcher) Mockito.spy(super.constructFetcherForHost(inputHost, configuration));
            final FetchResult fetchResult = (FetchResult) Mockito.mock(FetchResult.class);
            try {
                ((Fetcher) Mockito.doAnswer(new Answer<FetchResult>() { // from class: org.apache.tez.runtime.library.common.shuffle.impl.TestShuffleManager.ShuffleManagerForTest.1
                    /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                    public FetchResult m16answer(InvocationOnMock invocationOnMock) throws Throwable {
                        for (InputAttemptIdentifier inputAttemptIdentifier : fetcher.getSrcAttempts()) {
                            ShuffleManagerForTest.this.fetchSucceeded(fetcher.getHost(), inputAttemptIdentifier, new TestFetchedInput(inputAttemptIdentifier), 0L, 0L, 0L);
                        }
                        return fetchResult;
                    }
                }).when(fetcher)).callInternal();
            } catch (Exception e) {
            }
            return fetcher;
        }

        public int getNumOfCompletedInputs() {
            return this.completedInputSet.cardinality();
        }

        boolean isFetcherExecutorShutdown() {
            return this.fetcherExecutor.isShutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager$TestFetchedInput.class */
    public static class TestFetchedInput extends FetchedInput {
        public TestFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
            super(inputAttemptIdentifier, (FetchedInputCallback) null);
        }

        public long getSize() {
            return -1L;
        }

        public FetchedInput.Type getType() {
            return FetchedInput.Type.MEMORY;
        }

        public OutputStream getOutputStream() throws IOException {
            return null;
        }

        public InputStream getInputStream() throws IOException {
            return null;
        }

        public void commit() throws IOException {
        }

        public void abort() throws IOException {
        }

        public void free() {
        }
    }

    @Before
    public void setup() {
        this.sharedExecutor = new TezSharedExecutor(this.conf);
    }

    @After
    public void cleanup() {
        this.sharedExecutor.shutdownNow();
    }

    @Test(timeout = 50000)
    public void testMultiplePartitions() throws Exception {
        InputContext createInputContext = createInputContext();
        ShuffleManagerForTest createShuffleManager = createShuffleManager(createInputContext, 15);
        ShuffleInputEventHandlerImpl shuffleInputEventHandlerImpl = new ShuffleInputEventHandlerImpl(createInputContext, createShuffleManager, (FetchedInputAllocator) Mockito.mock(FetchedInputAllocator.class), (CompressionCodec) null, false, 0, false);
        createShuffleManager.run();
        LinkedList linkedList = new LinkedList();
        int i = 0;
        for (int i2 = 0; i2 < 3; i2++) {
            String str = "host" + i2;
            int i3 = 20;
            linkedList.clear();
            for (int i4 = 0; i4 < 2; i4++) {
                int i5 = i3;
                i3++;
                int i6 = i;
                i++;
                linkedList.add(createDataMovementEvent(str, i5, i6));
            }
            shuffleInputEventHandlerImpl.handleEvents(linkedList);
            Thread.sleep(500L);
            linkedList.clear();
            for (int i7 = 0; i7 < 3; i7++) {
                int i8 = i3;
                i3++;
                int i9 = i;
                i++;
                linkedList.add(createDataMovementEvent(str, i8, i9));
            }
            shuffleInputEventHandlerImpl.handleEvents(linkedList);
        }
        int i10 = 100;
        while (true) {
            int i11 = i10;
            i10--;
            if (i11 <= 0 || (createShuffleManager.isFetcherExecutorShutdown() && 15 == createShuffleManager.getNumOfCompletedInputs())) {
                break;
            } else {
                Thread.sleep(100L);
            }
        }
        Assert.assertTrue(createShuffleManager.isFetcherExecutorShutdown());
        Assert.assertEquals(15L, createShuffleManager.getNumOfCompletedInputs());
    }

    private InputContext createInputContext() throws IOException {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        dataOutputBuffer.writeInt(PORT);
        ByteBuffer wrap = ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
        dataOutputBuffer.close();
        ExecutionContext executionContext = (ExecutionContext) Mockito.mock(ExecutionContext.class);
        ((ExecutionContext) Mockito.doReturn("localhost").when(executionContext)).getHostName();
        InputContext inputContext = (InputContext) Mockito.mock(InputContext.class);
        ((InputContext) Mockito.doReturn(new TezCounters()).when(inputContext)).getCounters();
        ((InputContext) Mockito.doReturn("sourceVertex").when(inputContext)).getSourceVertexName();
        ((InputContext) Mockito.doReturn(wrap).when(inputContext)).getServiceProviderMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        ((InputContext) Mockito.doReturn(executionContext).when(inputContext)).getExecutionContext();
        Mockito.when(inputContext.createTezFrameworkExecutorService(Matchers.anyInt(), Matchers.anyString())).thenAnswer(new Answer<ExecutorService>() { // from class: org.apache.tez.runtime.library.common.shuffle.impl.TestShuffleManager.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public ExecutorService m15answer(InvocationOnMock invocationOnMock) throws Throwable {
                return TestShuffleManager.this.sharedExecutor.createExecutorService(((Integer) invocationOnMock.getArgumentAt(0, Integer.class)).intValue(), (String) invocationOnMock.getArgumentAt(1, String.class));
            }
        });
        return inputContext;
    }

    @Test(timeout = 5000)
    public void testUseSharedExecutor() throws Exception {
        InputContext createInputContext = createInputContext();
        createShuffleManager(createInputContext, 2);
        ((InputContext) Mockito.verify(createInputContext, Mockito.times(0))).createTezFrameworkExecutorService(Matchers.anyInt(), Matchers.anyString());
        InputContext createInputContext2 = createInputContext();
        this.conf.setBoolean("tez.runtime.shuffle.fetcher.use-shared-pool", true);
        createShuffleManager(createInputContext2, 2);
        ((InputContext) Mockito.verify(createInputContext2)).createTezFrameworkExecutorService(Matchers.anyInt(), Matchers.anyString());
    }

    @Test(timeout = 20000)
    public void testProgressWithEmptyPendingHosts() throws Exception {
        InputContext createInputContext = createInputContext();
        final ShuffleManager shuffleManager = (ShuffleManager) Mockito.spy(createShuffleManager(createInputContext, 1));
        Thread thread = new Thread(new Runnable() { // from class: org.apache.tez.runtime.library.common.shuffle.impl.TestShuffleManager.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    shuffleManager.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();
        Thread.currentThread();
        Thread.sleep(4000L);
        thread.interrupt();
        ((InputContext) Mockito.verify(createInputContext, Mockito.atLeast(3))).notifyProgress();
    }

    @Test(timeout = 200000)
    public void testFetchFailed() throws Exception {
        InputContext createInputContext = createInputContext();
        final ShuffleManager shuffleManager = (ShuffleManager) Mockito.spy(createShuffleManager(createInputContext, 1));
        Thread thread = new Thread(new Runnable() { // from class: org.apache.tez.runtime.library.common.shuffle.impl.TestShuffleManager.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    shuffleManager.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(1, 1);
        thread.start();
        Thread.sleep(1000L);
        shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
        Thread.sleep(1000L);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(List.class);
        ((InputContext) Mockito.verify(createInputContext, Mockito.times(1))).sendEvents((List) forClass.capture());
        Assert.assertEquals("Size was: " + forClass.getAllValues().size(), forClass.getAllValues().size(), 1L);
        List list = (List) forClass.getAllValues().get(0);
        Assert.assertEquals("Size was: " + list.size(), list.size(), 1L);
        Assert.assertEquals("Number of failures was: " + ((InputReadErrorEvent) list.get(0)).getNumFailures(), r0.getNumFailures(), 1L);
        shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
        shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
        Thread.sleep(1000L);
        ((InputContext) Mockito.verify(createInputContext, Mockito.times(1))).sendEvents((List) Matchers.any());
        Thread.sleep(5000L);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(List.class);
        ((InputContext) Mockito.verify(createInputContext, Mockito.times(2))).sendEvents((List) forClass2.capture());
        Assert.assertEquals("Size was: " + forClass2.getAllValues().size(), forClass2.getAllValues().size(), 2L);
        List list2 = (List) forClass2.getAllValues().get(1);
        Assert.assertEquals("Size was: " + list2.size(), list2.size(), 1L);
        Assert.assertEquals("Number of failures was: " + ((InputReadErrorEvent) list2.get(0)).getNumFailures(), r0.getNumFailures(), 2L);
        thread.interrupt();
    }

    private ShuffleManagerForTest createShuffleManager(InputContext inputContext, int i) throws IOException {
        ((InputContext) Mockito.doReturn(new String[]{new Path(".", "outDir").toString()}).when(inputContext)).getWorkDirs();
        this.conf.setStrings("tez.runtime.framework.local.dirs", inputContext.getWorkDirs());
        this.conf.setInt("tez.runtime.shuffle.batch.wait", 5000);
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        new Token(new JobTokenIdentifier(), new JobTokenSecretManager((SecretKey) null)).write(dataOutputBuffer);
        ((InputContext) Mockito.doReturn(ByteBuffer.wrap(dataOutputBuffer.getData())).when(inputContext)).getServiceConsumerMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        return new ShuffleManagerForTest(inputContext, this.conf, i, 1024, false, -1, null, (FetchedInputAllocator) Mockito.mock(FetchedInputAllocator.class));
    }

    private Event createDataMovementEvent(String str, int i, int i2) {
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder newBuilder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        newBuilder.setHost(str);
        newBuilder.setPort(PORT);
        newBuilder.setPathComponent(PATH_COMPONENT);
        return DataMovementEvent.create(i, i2, 0, newBuilder.build().toByteString().asReadOnlyByteBuffer());
    }
}
