package com.marklogic.contentpump;

import com.marklogic.contentpump.utilities.ReflectionUtil;
import com.marklogic.mapreduce.utilities.InternalUtilities;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;

/* loaded from: input_file:com/marklogic/contentpump/MultithreadedMapper.class */
public class MultithreadedMapper<K1, V1, K2, V2> extends Mapper<K1, V1, K2, V2> {
    private static final Log LOG = LogFactory.getLog(MultithreadedMapper.class);
    private Class<? extends BaseMapper<K1, V1, K2, V2>> mapClass;
    private Mapper<K1, V1, K2, V2>.Context outer;
    private List<MultithreadedMapper<K1, V1, K2, V2>.MapRunner> runnerList = new ArrayList();
    private List<Future<?>> runnerFutureList = new ArrayList();
    private int threadCount = 0;
    private ThreadPoolExecutor threadPool;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/marklogic/contentpump/MultithreadedMapper$MapRunner.class */
    public class MapRunner extends Thread {
        private BaseMapper<K1, V1, K2, V2> mapper;
        private Mapper<K1, V1, K2, V2>.Context subcontext;
        private Throwable throwable;
        private RecordWriter<K2, V2> writer;
        AtomicBoolean shutdown = new AtomicBoolean(false);
        AtomicBoolean isShutdownDone = new AtomicBoolean(false);

        MapRunner() throws IOException, ClassNotFoundException {
            this.mapper = (BaseMapper) ReflectionUtils.newInstance(MultithreadedMapper.this.mapClass, MultithreadedMapper.this.outer.getConfiguration());
            try {
                this.writer = ((OutputFormat) ReflectionUtils.newInstance(MultithreadedMapper.this.outer.getOutputFormatClass(), MultithreadedMapper.this.outer.getConfiguration())).getRecordWriter(MultithreadedMapper.this.outer);
                this.subcontext = ReflectionUtil.createMapperContext(this.mapper, MultithreadedMapper.this.outer.getConfiguration(), MultithreadedMapper.this.outer.getTaskAttemptID(), new SubMapRecordReader(), this.writer, MultithreadedMapper.this.outer.getOutputCommitter(), new SubMapStatusReporter(), MultithreadedMapper.this.outer.getInputSplit());
            } catch (Exception e) {
                throw new IOException("Error creating mapper context", e);
            }
        }

        public BaseMapper<K1, V1, K2, V2> getMapper() {
            return this.mapper;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.mapper.runThreadSafe(MultithreadedMapper.this.outer, this.subcontext, this);
            } catch (Throwable th) {
                if (MultithreadedMapper.LOG.isDebugEnabled()) {
                    MultithreadedMapper.LOG.debug("Error running task:" + th);
                    th.printStackTrace();
                }
            }
            try {
                this.writer.close(this.subcontext);
            } catch (Throwable th2) {
                MultithreadedMapper.LOG.error("Error closing writer: " + th2.getMessage());
                if (MultithreadedMapper.LOG.isDebugEnabled()) {
                    MultithreadedMapper.LOG.debug(th2);
                }
            }
            this.isShutdownDone.set(true);
        }

        public void setShutdown(boolean z) {
            this.shutdown.set(z);
        }

        public boolean getShutdown() {
            return this.shutdown.get();
        }

        public boolean getIsShutdownDone() {
            return this.isShutdownDone.get();
        }
    }

    /* loaded from: input_file:com/marklogic/contentpump/MultithreadedMapper$SubMapRecordReader.class */
    private class SubMapRecordReader extends RecordReader<K1, V1> {
        private K1 key;
        private V1 value;

        private SubMapRecordReader() {
        }

        public void close() throws IOException {
        }

        public float getProgress() throws IOException, InterruptedException {
            return 0.0f;
        }

        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        }

        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!MultithreadedMapper.this.outer.nextKeyValue()) {
                return false;
            }
            if (MultithreadedMapper.this.outer.getCurrentKey() == null) {
                return true;
            }
            this.key = (K1) ReflectionUtils.newInstance(MultithreadedMapper.this.outer.getCurrentKey().getClass(), MultithreadedMapper.this.outer.getConfiguration());
            this.key = (K1) ReflectionUtils.copy(MultithreadedMapper.this.outer.getConfiguration(), MultithreadedMapper.this.outer.getCurrentKey(), this.key);
            Object currentValue = MultithreadedMapper.this.outer.getCurrentValue();
            if (currentValue == null) {
                return true;
            }
            this.value = (V1) ReflectionUtils.newInstance(currentValue.getClass(), MultithreadedMapper.this.outer.getConfiguration());
            this.value = (V1) ReflectionUtils.copy(MultithreadedMapper.this.outer.getConfiguration(), MultithreadedMapper.this.outer.getCurrentValue(), this.value);
            return true;
        }

        public K1 getCurrentKey() {
            return this.key;
        }

        public V1 getCurrentValue() {
            return this.value;
        }
    }

    /* loaded from: input_file:com/marklogic/contentpump/MultithreadedMapper$SubMapStatusReporter.class */
    private class SubMapStatusReporter extends StatusReporter {
        private SubMapStatusReporter() {
        }

        public Counter getCounter(Enum<?> r4) {
            return MultithreadedMapper.this.outer.getCounter(r4);
        }

        public Counter getCounter(String str, String str2) {
            return MultithreadedMapper.this.outer.getCounter(str, str2);
        }

        public void progress() {
            MultithreadedMapper.this.outer.progress();
        }

        public void setStatus(String str) {
            MultithreadedMapper.this.outer.setStatus(str);
        }

        public float getProgress() {
            try {
                Method method = MultithreadedMapper.this.outer.getClass().getMethod("getProgress", new Class[0]);
                if (method != null) {
                    return ((Float) method.invoke(MultithreadedMapper.this.outer, new Object[0])).floatValue();
                }
                return 0.0f;
            } catch (Exception e) {
                MultithreadedMapper.LOG.error("Error getting progress", e);
                return 0.0f;
            }
        }
    }

    public int getThreadCount(Mapper<K1, V1, K2, V2>.Context context) {
        return this.threadCount > 0 ? this.threadCount : getNumberOfThreads(context);
    }

    public void setThreadCount(int i) {
        this.threadCount = i;
    }

    public void setThreadPool(ThreadPoolExecutor threadPoolExecutor) {
        this.threadPool = threadPoolExecutor;
    }

    public static int getNumberOfThreads(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(ConfigConstants.CONF_THREADS_PER_SPLIT, 4);
    }

    public static void setNumberOfThreads(Job job, int i) {
        job.getConfiguration().setInt(ConfigConstants.CONF_THREADS_PER_SPLIT, i);
    }

    public static void setNumberOfThreads(Configuration configuration, int i) {
        configuration.setInt(ConfigConstants.CONF_THREADS_PER_SPLIT, i);
    }

    public static <K1, V1, K2, V2> Class<BaseMapper<K1, V1, K2, V2>> getMapperClass(JobContext jobContext) {
        return jobContext.getConfiguration().getClass(ConfigConstants.CONF_MULTITHREADEDMAPPER_CLASS, BaseMapper.class);
    }

    public static <K1, V1, K2, V2> void setMapperClass(Configuration configuration, Class<? extends BaseMapper<?, ?, ?, ?>> cls) {
        if (MultithreadedMapper.class.isAssignableFrom(cls)) {
            throw new IllegalArgumentException("Can't have recursive MultithreadedMapper instances.");
        }
        configuration.setClass(ConfigConstants.CONF_MULTITHREADEDMAPPER_CLASS, cls, Mapper.class);
    }

    public void createRunners(int i) throws IOException, InterruptedException, ClassNotFoundException {
        synchronized (this.threadPool) {
            for (int i2 = 0; i2 < i; i2++) {
                MultithreadedMapper<K1, V1, K2, V2>.MapRunner mapRunner = new MapRunner();
                synchronized (this.runnerFutureList) {
                    this.runnerList.add(mapRunner);
                    if (this.threadPool.isShutdown()) {
                        throw new InterruptedException("Thread Pool has been shut down");
                    }
                    this.runnerFutureList.add(this.threadPool.submit(mapRunner));
                }
            }
            this.threadPool.notify();
        }
    }

    public void stopRunners(int i) {
        for (int i2 = 0; i2 < i; i2++) {
            this.runnerList.get((this.runnerList.size() - i2) - 1).setShutdown(true);
        }
        while (true) {
            boolean z = true;
            synchronized (this.runnerList) {
                for (int i3 = 0; i3 < i; i3++) {
                    z &= this.runnerList.get((this.runnerList.size() - i3) - 1).getIsShutdownDone();
                }
            }
            if (z) {
                break;
            } else {
                try {
                    InternalUtilities.sleep(500L);
                } catch (Exception e) {
                }
            }
        }
        synchronized (this.runnerFutureList) {
            for (int i4 = 0; i4 < i; i4++) {
                this.runnerList.remove(this.runnerList.size() - 1);
                this.runnerFutureList.remove(this.runnerFutureList.size() - 1);
            }
        }
    }

    public void run(Mapper<K1, V1, K2, V2>.Context context) throws IOException, InterruptedException {
        boolean z;
        this.outer = context;
        int threadCount = getThreadCount(context);
        this.mapClass = getMapperClass(context);
        int i = threadCount - 1;
        try {
            if (this.threadPool != null) {
                createRunners(i);
                new MapRunner().run();
                do {
                    z = true;
                    synchronized (this.runnerFutureList) {
                        Iterator<Future<?>> it = this.runnerFutureList.iterator();
                        while (it.hasNext()) {
                            z &= it.next().isDone();
                        }
                    }
                } while (!z);
                synchronized (this.runnerFutureList) {
                    Iterator<Future<?>> it2 = this.runnerFutureList.iterator();
                    while (it2.hasNext()) {
                        it2.next().get();
                    }
                }
            } else {
                for (int i2 = 0; i2 < i; i2++) {
                    MultithreadedMapper<K1, V1, K2, V2>.MapRunner mapRunner = new MapRunner();
                    mapRunner.start();
                    this.runnerList.add(i2, mapRunner);
                }
                new MapRunner().run();
                for (int i3 = 0; i3 < i; i3++) {
                    MultithreadedMapper<K1, V1, K2, V2>.MapRunner mapRunner2 = this.runnerList.get(i3);
                    mapRunner2.join();
                    Throwable th = ((MapRunner) mapRunner2).throwable;
                    if (th != null) {
                        if (th instanceof IOException) {
                            throw ((IOException) th);
                        }
                        if (!(th instanceof InterruptedException)) {
                            throw new RuntimeException(th);
                        }
                        throw ((InterruptedException) th);
                    }
                }
            }
        } catch (ClassNotFoundException e) {
            LOG.error("MapRunner class not found", e);
        } catch (ExecutionException e2) {
            LOG.error("Error waiting for MapRunner threads to complete", e2);
        }
    }
}
