/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sqoop.job.mr;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.ErrorCode;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.error.code.MRExecutionError;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.submission.counter.SqoopCounters;
import org.apache.sqoop.utils.ClassUtils;

public class SqoopOutputFormatLoadExecutor {
    public static final Logger LOG = Logger.getLogger(SqoopOutputFormatLoadExecutor.class);
    private volatile boolean readerFinished = false;
    private volatile boolean writerFinished = false;
    private volatile IntermediateDataFormat<? extends Object> toDataFormat;
    private Matcher matcher;
    private JobContext context;
    private SqoopRecordWriter writer;
    private Future<?> consumerFuture;
    private Semaphore filled = new Semaphore(0, true);
    private Semaphore free = new Semaphore(1, true);
    private String loaderName;

    SqoopOutputFormatLoadExecutor(JobContext jobctx, String loaderName, IntermediateDataFormat<?> toDataFormat, Matcher matcher) {
        this.context = jobctx;
        this.loaderName = loaderName;
        this.matcher = matcher;
        this.toDataFormat = toDataFormat;
        this.writer = new SqoopRecordWriter();
    }

    public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
        this.context = jobctx;
        this.loaderName = this.context.getConfiguration().get("org.apache.sqoop.job.etl.loader");
        this.writer = new SqoopRecordWriter();
        this.matcher = MatcherFactory.getMatcher((Schema)MRConfigurationUtils.getConnectorSchema(Direction.FROM, this.context.getConfiguration()), (Schema)MRConfigurationUtils.getConnectorSchema(Direction.TO, this.context.getConfiguration()));
        this.toDataFormat = (IntermediateDataFormat)ClassUtils.instantiate((String)this.context.getConfiguration().get("org.apache.sqoop.execution.to.intermediate.format"), (Object[])new Object[0]);
        this.toDataFormat.setSchema(this.matcher.getToSchema());
    }

    public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
        this.consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("OutputFormatLoader-consumer").build()).submit(new ConsumerThread(this.context));
        return this.writer;
    }

    private void checkIfConsumerThrew() {
        if (this.readerFinished) {
            this.waitForConsumer();
        }
    }

    private void waitForConsumer() {
        try {
            this.consumerFuture.get();
        }
        catch (ExecutionException ex) {
            Throwable t = ex.getCause();
            if (t instanceof SqoopException) {
                throw (SqoopException)t;
            }
            Throwables.propagate((Throwable)t);
        }
        catch (Exception ex) {
            throw new SqoopException((ErrorCode)MRExecutionError.MAPRED_EXEC_0019, (Throwable)ex);
        }
    }

    private class ConsumerThread
    implements Runnable {
        private final JobContext jobctx;

        public ConsumerThread(JobContext context) {
            this.jobctx = context;
        }

        @Override
        public void run() {
            LOG.info((Object)"SqoopOutputFormatLoadExecutor consumer thread is starting");
            try {
                SqoopOutputFormatDataReader reader = new SqoopOutputFormatDataReader();
                Configuration conf = SqoopOutputFormatLoadExecutor.this.context.getConfiguration();
                Loader loader = (Loader)ClassUtils.instantiate((String)SqoopOutputFormatLoadExecutor.this.loaderName, (Object[])new Object[0]);
                PrefixContext subContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.to.context.");
                Object connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf);
                Object connectorToJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
                LoaderContext loaderContext = new LoaderContext((ImmutableContext)subContext, (DataReader)reader, SqoopOutputFormatLoadExecutor.this.matcher.getToSchema());
                LOG.info((Object)("Running loader class " + SqoopOutputFormatLoadExecutor.this.loaderName));
                loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig);
                LOG.info((Object)"Loader has finished");
                ((TaskAttemptContext)this.jobctx).getCounter((Enum)SqoopCounters.ROWS_WRITTEN).increment(loader.getRowsWritten());
            }
            catch (Throwable t) {
                SqoopOutputFormatLoadExecutor.this.readerFinished = true;
                LOG.error((Object)"Error while loading data out of MR job.", t);
                SqoopOutputFormatLoadExecutor.this.free.release();
                throw new SqoopException((ErrorCode)MRExecutionError.MAPRED_EXEC_0018, t);
            }
            if (!SqoopOutputFormatLoadExecutor.this.writerFinished) {
                SqoopOutputFormatLoadExecutor.this.readerFinished = true;
                LOG.error((Object)"Reader terminated, but writer is still running!");
                SqoopOutputFormatLoadExecutor.this.free.release();
                throw new SqoopException((ErrorCode)MRExecutionError.MAPRED_EXEC_0019);
            }
            SqoopOutputFormatLoadExecutor.this.readerFinished = true;
        }
    }

    private class SqoopOutputFormatDataReader
    extends DataReader {
        private SqoopOutputFormatDataReader() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Object[] readArrayRecord() throws InterruptedException {
            this.acquireSema();
            if (SqoopOutputFormatLoadExecutor.this.writerFinished) {
                return null;
            }
            try {
                Object[] objectArray = SqoopOutputFormatLoadExecutor.this.toDataFormat.getObjectData();
                return objectArray;
            }
            finally {
                this.releaseSema();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public String readTextRecord() throws InterruptedException {
            this.acquireSema();
            if (SqoopOutputFormatLoadExecutor.this.writerFinished) {
                return null;
            }
            try {
                String string = SqoopOutputFormatLoadExecutor.this.toDataFormat.getCSVTextData();
                return string;
            }
            finally {
                this.releaseSema();
            }
        }

        public Object readContent() throws InterruptedException {
            this.acquireSema();
            if (SqoopOutputFormatLoadExecutor.this.writerFinished) {
                return null;
            }
            try {
                Object object = SqoopOutputFormatLoadExecutor.this.toDataFormat.getData();
                return object;
            }
            catch (Throwable t) {
                SqoopOutputFormatLoadExecutor.this.readerFinished = true;
                LOG.error((Object)"Caught exception e while getting content ", t);
                throw new SqoopException((ErrorCode)MRExecutionError.MAPRED_EXEC_0018, t);
            }
            finally {
                this.releaseSema();
            }
        }

        private void acquireSema() throws InterruptedException {
            try {
                SqoopOutputFormatLoadExecutor.this.filled.acquire();
            }
            catch (InterruptedException ex) {
                LOG.error((Object)"Interrupted while waiting for data to be available from mapper", (Throwable)ex);
                throw ex;
            }
        }

        private void releaseSema() {
            SqoopOutputFormatLoadExecutor.this.free.release();
        }
    }

    private class SqoopRecordWriter
    extends RecordWriter<SqoopWritable, NullWritable> {
        private SqoopRecordWriter() {
        }

        public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
            SqoopOutputFormatLoadExecutor.this.free.acquire();
            SqoopOutputFormatLoadExecutor.this.checkIfConsumerThrew();
            SqoopOutputFormatLoadExecutor.this.toDataFormat.setCSVTextData(key.toString());
            SqoopOutputFormatLoadExecutor.this.filled.release();
        }

        public void close(TaskAttemptContext context) throws InterruptedException, IOException {
            LOG.info((Object)"SqoopOutputFormatLoadExecutor::SqoopRecordWriter is about to be closed");
            SqoopOutputFormatLoadExecutor.this.free.acquire();
            SqoopOutputFormatLoadExecutor.this.writerFinished = true;
            SqoopOutputFormatLoadExecutor.this.filled.release();
            SqoopOutputFormatLoadExecutor.this.waitForConsumer();
            LOG.info((Object)"SqoopOutputFormatLoadExecutor::SqoopRecordWriter is closed");
        }
    }
}

