/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.hadoop.output;

import com.mongodb.BasicDBObject;
import com.mongodb.BulkUpdateRequestBuilder;
import com.mongodb.BulkWriteOperation;
import com.mongodb.BulkWriteRequestBuilder;
import com.mongodb.DBCollection;
import com.mongodb.MongoException;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.io.MongoUpdateWritable;
import com.mongodb.hadoop.util.CompatUtils;
import com.mongodb.hadoop.util.MongoConfigUtil;
import java.io.DataInput;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class MongoOutputCommitter
extends OutputCommitter {
    public static final String TEMP_DIR_NAME = "_MONGO_OUT_TEMP";
    private static final Log LOG = LogFactory.getLog(MongoOutputCommitter.class);
    private DBCollection collection;

    public void setupJob(JobContext jobContext) {
        LOG.info((Object)"Setting up job.");
    }

    public void setupTask(TaskAttemptContext taskContext) {
        LOG.info((Object)"Setting up task.");
    }

    public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
        return this.needsTaskCommit(CompatUtils.getTaskAttemptContext(taskContext));
    }

    public void commitTask(TaskAttemptContext taskContext) throws IOException {
        this.commitTask(CompatUtils.getTaskAttemptContext(taskContext));
    }

    public void abortTask(TaskAttemptContext taskContext) throws IOException {
        this.abortTask(CompatUtils.getTaskAttemptContext(taskContext));
    }

    public boolean needsTaskCommit(CompatUtils.TaskAttemptContext taskContext) throws IOException {
        try {
            FileSystem fs = FileSystem.get((Configuration)taskContext.getConfiguration());
            return fs.exists(MongoOutputCommitter.getTaskAttemptPath(taskContext));
        }
        catch (IOException e) {
            LOG.error((Object)"Could not open filesystem", (Throwable)e);
            throw e;
        }
    }

    public void commitTask(CompatUtils.TaskAttemptContext taskContext) throws IOException {
        long fileLen;
        LOG.info((Object)"Committing task.");
        this.collection = MongoConfigUtil.getOutputCollection(taskContext.getConfiguration());
        Path tempFilePath = MongoOutputCommitter.getTaskAttemptPath(taskContext);
        LOG.info((Object)("Committing from temporary file: " + tempFilePath.toString()));
        long filePos = 0L;
        FSDataInputStream inputStream = null;
        try {
            FileSystem fs = FileSystem.get((Configuration)taskContext.getConfiguration());
            inputStream = fs.open(tempFilePath);
            fileLen = fs.getFileStatus(tempFilePath).getLen();
        }
        catch (IOException e) {
            LOG.error((Object)"Could not open temporary file for committing", (Throwable)e);
            this.cleanupAfterCommit(inputStream, taskContext);
            throw e;
        }
        int maxDocs = MongoConfigUtil.getBatchSize(taskContext.getConfiguration());
        int curBatchSize = 0;
        BulkWriteOperation bulkOp = MongoConfigUtil.isBulkOrdered(taskContext.getConfiguration()) ? this.collection.initializeOrderedBulkOperation() : this.collection.initializeUnorderedBulkOperation();
        BSONWritable bw = new BSONWritable();
        MongoUpdateWritable muw = new MongoUpdateWritable();
        while (filePos < fileLen) {
            try {
                int mwType = inputStream.readInt();
                if (0 == mwType) {
                    bw.readFields((DataInput)inputStream);
                    bulkOp.insert(new BasicDBObject(bw.getDoc().toMap()));
                } else if (1 == mwType) {
                    muw.readFields((DataInput)inputStream);
                    BasicDBObject query = new BasicDBObject(muw.getQuery().toMap());
                    BasicDBObject modifiers = new BasicDBObject(muw.getModifiers().toMap());
                    BulkWriteRequestBuilder writeBuilder = bulkOp.find(query);
                    if (muw.isReplace()) {
                        writeBuilder.replaceOne(modifiers);
                    } else if (muw.isUpsert()) {
                        BulkUpdateRequestBuilder updateBuilder = writeBuilder.upsert();
                        if (muw.isMultiUpdate()) {
                            updateBuilder.update(modifiers);
                        } else {
                            updateBuilder.updateOne(modifiers);
                        }
                    } else if (muw.isMultiUpdate()) {
                        writeBuilder.update(modifiers);
                    } else {
                        writeBuilder.updateOne(modifiers);
                    }
                } else {
                    throw new IOException("Unrecognized type: " + mwType);
                }
                filePos = inputStream.getPos();
                if (++curBatchSize < maxDocs && filePos < fileLen) continue;
                try {
                    bulkOp.execute();
                }
                catch (MongoException e) {
                    LOG.error((Object)"Could not write to MongoDB", (Throwable)e);
                    throw e;
                }
                bulkOp = this.collection.initializeOrderedBulkOperation();
                curBatchSize = 0;
                taskContext.progress();
            }
            catch (IOException e) {
                LOG.error((Object)"Error reading from temporary file", (Throwable)e);
                throw e;
            }
        }
        this.cleanupAfterCommit(inputStream, taskContext);
    }

    public void abortTask(CompatUtils.TaskAttemptContext taskContext) throws IOException {
        LOG.info((Object)"Aborting task.");
        this.cleanupResources(taskContext);
    }

    private void cleanupAfterCommit(FSDataInputStream inputStream, CompatUtils.TaskAttemptContext context) throws IOException {
        if (inputStream != null) {
            try {
                inputStream.close();
            }
            catch (IOException e) {
                LOG.error((Object)"Could not close input stream", (Throwable)e);
                throw e;
            }
        }
        this.cleanupResources(context);
    }

    private void cleanupResources(CompatUtils.TaskAttemptContext taskContext) throws IOException {
        Path currentPath = MongoOutputCommitter.getTaskAttemptPath(taskContext);
        Path tempDirectory = MongoOutputCommitter.getTempDirectory(taskContext.getConfiguration());
        FileSystem fs = FileSystem.get((Configuration)taskContext.getConfiguration());
        while (!currentPath.equals((Object)tempDirectory)) {
            try {
                fs.delete(currentPath, true);
            }
            catch (IOException e) {
                LOG.error((Object)("Could not delete temporary file: " + currentPath), (Throwable)e);
                throw e;
            }
            currentPath = currentPath.getParent();
        }
        if (this.collection != null) {
            MongoConfigUtil.close(this.collection.getDB().getMongo());
        }
    }

    private static Path getTempDirectory(Configuration config) {
        String basePath = config.get("mapreduce.task.tmp.dir", config.get("mapred.child.tmp", config.get("hadoop.tmp.dir", "/tmp")));
        return new Path(basePath);
    }

    public static Path getTaskAttemptPath(CompatUtils.TaskAttemptContext context) {
        Configuration config = context.getConfiguration();
        return new Path(String.format("%s/%s/%s/_out", MongoOutputCommitter.getTempDirectory(config), context.getTaskAttemptID().toString(), TEMP_DIR_NAME));
    }
}

