package com.cloudera.sqoop.mapreduce;

import com.cloudera.sqoop.io.NamedFifo;
import com.cloudera.sqoop.manager.MySQLUtils;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.util.AsyncSink;
import com.cloudera.sqoop.util.JdbcUrl;
import com.cloudera.sqoop.util.LoggingAsyncSink;
import com.cloudera.sqoop.util.NullAsyncSink;
import com.cloudera.sqoop.util.TaskId;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;

/* loaded from: input_file:com/cloudera/sqoop/mapreduce/MySQLExportMapper.class */
public class MySQLExportMapper<KEYIN, VALIN> extends Mapper<KEYIN, VALIN, NullWritable, NullWritable> {
    public static final Log LOG = LogFactory.getLog(MySQLExportMapper.class.getName());
    public static final String MYSQL_CHECKPOINT_BYTES_KEY = "sqoop.mysql.export.checkpoint.bytes";
    public static final long DEFAULT_CHECKPOINT_BYTES = 33554432;
    protected long checkpointDistInBytes;
    protected Configuration conf;
    protected File fifoFile;
    protected Process mysqlImportProcess;
    protected OutputStream importStream;
    protected AsyncSink outSink;
    protected AsyncSink errSink;
    protected File passwordFile;
    protected String mysqlCharSet;
    private long bytesWritten;

    private void initMySQLImportProcess() throws IOException {
        this.fifoFile = new File(TaskId.getLocalWorkPath(this.conf), this.conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
        String file = this.fifoFile.toString();
        try {
            new NamedFifo(this.fifoFile).create();
            ArrayList arrayList = new ArrayList();
            String str = this.conf.get(MySQLUtils.CONNECT_STRING_KEY);
            String databaseName = JdbcUrl.getDatabaseName(str);
            String hostName = JdbcUrl.getHostName(str);
            int port = JdbcUrl.getPort(str);
            if (null == databaseName) {
                throw new IOException("Could not determine database name");
            }
            arrayList.add(MySQLUtils.MYSQL_IMPORT_CMD);
            String str2 = this.conf.get(MySQLUtils.PASSWORD_KEY);
            if (null != str2 && str2.length() > 0) {
                this.passwordFile = new File(MySQLUtils.writePasswordFile(this.conf));
                arrayList.add("--defaults-file=" + this.passwordFile);
            }
            String str3 = this.conf.get(MySQLUtils.USERNAME_KEY);
            if (null != str3) {
                arrayList.add("--user=" + str3);
            }
            arrayList.add("--host=" + hostName);
            if (-1 != port) {
                arrayList.add("--port=" + Integer.toString(port));
            }
            arrayList.add("--compress");
            arrayList.add("--local");
            arrayList.add("--silent");
            String[] inputFieldNames = new DBConfiguration(this.conf).getInputFieldNames();
            if (null != inputFieldNames) {
                StringBuilder sb = new StringBuilder();
                boolean z = true;
                for (String str4 : inputFieldNames) {
                    if (!z) {
                        sb.append(",");
                    }
                    sb.append(str4);
                    z = false;
                }
                arrayList.add("--columns=" + sb.toString());
            }
            int i = this.conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY, 44);
            int i2 = this.conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY, 10);
            int i3 = this.conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0);
            int i4 = this.conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0);
            boolean z2 = this.conf.getBoolean(MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
            arrayList.add("--fields-terminated-by=0x" + Integer.toString(i, 16));
            arrayList.add("--lines-terminated-by=0x" + Integer.toString(i2, 16));
            if (0 != i3) {
                if (z2) {
                    arrayList.add("--fields-enclosed-by=0x" + Integer.toString(i3, 16));
                } else {
                    arrayList.add("--fields-optionally-enclosed-by=0x" + Integer.toString(i3, 16));
                }
            }
            if (0 != i4) {
                arrayList.add("--escaped-by=0x" + Integer.toString(i4, 16));
            }
            arrayList.add(databaseName);
            arrayList.add(file);
            LOG.debug("Starting mysqlimport with arguments:");
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                LOG.debug("  " + ((String) it.next()));
            }
            this.mysqlImportProcess = Runtime.getRuntime().exec((String[]) arrayList.toArray(new String[0]));
            this.outSink = new NullAsyncSink();
            this.outSink.processStream(this.mysqlImportProcess.getInputStream());
            this.errSink = new LoggingAsyncSink(LOG);
            this.errSink.processStream(this.mysqlImportProcess.getErrorStream());
            this.importStream = new BufferedOutputStream(new FileOutputStream(this.fifoFile));
            this.bytesWritten = 0L;
        } catch (IOException e) {
            LOG.error("Could not mknod " + file);
            this.fifoFile = null;
            throw new IOException("Could not create FIFO to interface with mysqlimport", e);
        }
    }

    public void run(Mapper<KEYIN, VALIN, NullWritable, NullWritable>.Context context) throws IOException, InterruptedException {
        this.conf = context.getConfiguration();
        setup(context);
        initMySQLImportProcess();
        while (context.nextKeyValue()) {
            try {
                map(context.getCurrentKey(), context.getCurrentValue(), context);
            } catch (Throwable th) {
                closeExportHandles();
                throw th;
            }
        }
        cleanup(context);
        closeExportHandles();
    }

    private void closeExportHandles() throws IOException, InterruptedException {
        int i = 0;
        if (null != this.importStream) {
            LOG.debug("Closing import stream");
            this.importStream.close();
            this.importStream = null;
        }
        if (null != this.mysqlImportProcess) {
            LOG.info("Waiting for mysqlimport to complete");
            i = this.mysqlImportProcess.waitFor();
            LOG.info("mysqlimport closed connection");
            this.mysqlImportProcess = null;
        }
        if (null != this.passwordFile && this.passwordFile.exists()) {
            if (!this.passwordFile.delete()) {
                LOG.error("Could not remove mysql password file " + this.passwordFile);
                LOG.error("You should remove this file to protect your credentials.");
            }
            this.passwordFile = null;
        }
        if (null != this.outSink) {
            LOG.debug("Waiting for any additional stdout from mysqlimport");
            this.outSink.join();
            this.outSink = null;
        }
        if (null != this.errSink) {
            LOG.debug("Waiting for any additional stderr from mysqlimport");
            this.errSink.join();
            this.errSink = null;
        }
        if (this.fifoFile != null && this.fifoFile.exists()) {
            LOG.debug("Removing fifo file");
            if (!this.fifoFile.delete()) {
                LOG.error("Could not clean up named FIFO after completing mapper");
            }
            File parentFile = this.fifoFile.getParentFile();
            LOG.debug("Removing task attempt tmpdir");
            if (!parentFile.delete()) {
                LOG.error("Could not clean up task dir after completing mapper");
            }
            this.fifoFile = null;
        }
        if (0 != i) {
            throw new IOException("mysqlimport terminated with error code " + i);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setup(Mapper<KEYIN, VALIN, NullWritable, NullWritable>.Context context) {
        this.conf = context.getConfiguration();
        this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET;
        this.checkpointDistInBytes = this.conf.getLong(MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
        if (this.checkpointDistInBytes < 0) {
            LOG.warn("Invalid value for sqoop.mysql.export.checkpoint.bytes");
            this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeRecord(String str, String str2) throws IOException, InterruptedException {
        byte[] bytes = str.getBytes(this.mysqlCharSet);
        this.importStream.write(bytes, 0, bytes.length);
        this.bytesWritten += bytes.length;
        if (null != str2) {
            byte[] bytes2 = str2.getBytes(this.mysqlCharSet);
            this.importStream.write(bytes2, 0, bytes2.length);
            this.bytesWritten += bytes2.length;
        }
        if (this.checkpointDistInBytes == 0 || this.bytesWritten <= this.checkpointDistInBytes) {
            return;
        }
        LOG.info("Checkpointing current export.");
        closeExportHandles();
        initMySQLImportProcess();
        this.bytesWritten = 0L;
    }
}
