/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.mapreduce;

import com.marklogic.mapreduce.KeyValueInputFormat;
import com.marklogic.mapreduce.MarkLogicConstants;
import com.marklogic.mapreduce.MarkLogicInputSplit;
import com.marklogic.mapreduce.functions.LexiconFunction;
import com.marklogic.mapreduce.utilities.ForestHost;
import com.marklogic.mapreduce.utilities.InternalUtilities;
import com.marklogic.mapreduce.utilities.RestrictedHostsUtil;
import com.marklogic.xcc.AdhocQuery;
import com.marklogic.xcc.ContentSource;
import com.marklogic.xcc.Request;
import com.marklogic.xcc.RequestOptions;
import com.marklogic.xcc.ResultItem;
import com.marklogic.xcc.ResultSequence;
import com.marklogic.xcc.Session;
import com.marklogic.xcc.exceptions.RequestException;
import com.marklogic.xcc.exceptions.ServerConnectionException;
import com.marklogic.xcc.exceptions.XccConfigException;
import com.marklogic.xcc.types.ItemType;
import com.marklogic.xcc.types.XSInteger;
import com.marklogic.xcc.types.XSString;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.ReflectionUtils;

public abstract class MarkLogicInputFormat<KEYIN, VALUEIN>
extends InputFormat<KEYIN, VALUEIN>
implements MarkLogicConstants {
    public static final Log LOG = LogFactory.getLog(MarkLogicInputFormat.class);
    static final String DEFAULT_DOCUMENT_SELECTOR = "fn:collection()";
    static final String DEFAULT_CTS_QUERY = "()";
    Configuration jobConf = null;
    String docSelector;
    String localHost = null;
    boolean localMode = false;

    private void appendNsBindings(StringBuilder buf) {
        Collection nsCol = this.jobConf.getStringCollection("mapreduce.marklogic.input.namespace");
        if (nsCol != null && !nsCol.isEmpty()) {
            boolean isAlias = true;
            Iterator nsIt = nsCol.iterator();
            while (nsIt.hasNext()) {
                String ns = (String)nsIt.next();
                if (isAlias) {
                    buf.append("declare namespace ");
                    buf.append(ns);
                    buf.append("=\"");
                    isAlias = false;
                } else {
                    buf.append(ns);
                    buf.append("\";");
                    isAlias = true;
                }
                if (!nsIt.hasNext() || !isAlias) continue;
                buf.append('\n');
            }
        }
    }

    private void appendDocumentSelector(StringBuilder buf) {
        this.docSelector = this.jobConf.get("mapreduce.marklogic.input.documentselector");
        if (this.docSelector != null) {
            buf.append(this.docSelector);
        } else {
            buf.append(DEFAULT_DOCUMENT_SELECTOR);
        }
    }

    protected void appendQuery(StringBuilder buf) {
        String ctsQuery = this.jobConf.get("mapreduce.marklogic.input.filter.query");
        if (ctsQuery != null) {
            buf.append("\"cts:query(xdmp:unquote('");
            buf.append(ctsQuery.replaceAll("\"", "&#34;"));
            buf.append("')/*)\"");
        } else if (this.docSelector != null) {
            buf.append("'");
            buf.append(DEFAULT_CTS_QUERY);
            buf.append("'");
        } else {
            Class lexiconClass = this.jobConf.getClass("mapreduce.marklogic.input.lexiconfunctionclass", null, LexiconFunction.class);
            if (lexiconClass != null) {
                LexiconFunction function = (LexiconFunction)ReflectionUtils.newInstance((Class)lexiconClass, (Configuration)this.jobConf);
                buf.append("'");
                buf.append(function.getLexiconQuery());
                buf.append("'");
            } else {
                buf.append("'");
                buf.append(DEFAULT_CTS_QUERY);
                buf.append("'");
            }
        }
    }

    private void appendRedactionRuleValidateQuery(StringBuilder buf, String[] redactionRuleCol) {
        buf.append("\"REDACT\"");
        if (redactionRuleCol != null) {
            buf.append(",\n");
            buf.append("rdt:rule-validate((");
            for (int i = 0; i < redactionRuleCol.length; ++i) {
                if (i != 0) {
                    buf.append(", ");
                }
                buf.append("\"");
                buf.append(redactionRuleCol[i]);
                buf.append("\"");
            }
            buf.append("))");
        }
    }

    private void appendReplicaQuery(StringBuilder buf) {
        buf.append("let $repf := fn:function-lookup(");
        buf.append("xs:QName('hadoop:get-splits-with-replica'),0)\n");
        buf.append("return if (exists($repf)) then $repf() else ()\n");
    }

    protected void getForestSplits(JobContext jobContext, ResultSequence result, List<ForestSplit> forestSplits, List<String> ruleUris, String[] inputHosts) throws IOException {
        Object item;
        RestrictedHostsUtil rhUtil;
        int count = 0;
        boolean restrictHosts = this.jobConf.getBoolean("mapreduce.marklogic.input.restricthosts", false);
        RestrictedHostsUtil restrictedHostsUtil = rhUtil = restrictHosts ? new RestrictedHostsUtil(inputHosts) : null;
        while (result.hasNext()) {
            item = result.next();
            int index = count % 3;
            if (index == 0) {
                ForestSplit split = new ForestSplit();
                if (ItemType.XS_STRING == item.getItemType()) {
                    if ("REDACT".equals(((XSString)item.getItem()).asString())) break;
                    throw new IOException("Unexpected string item from getSplits query result");
                }
                split.forestId = ((XSInteger)item.getItem()).asBigInteger();
                forestSplits.add(split);
            } else if (index == 1) {
                forestSplits.get((int)(forestSplits.size() - 1)).recordCount = ((XSInteger)item.getItem()).asLong();
            } else if (index == 2) {
                String forestHost = ((XSString)item.getItem()).asString();
                if (restrictHosts) {
                    rhUtil.addForestHost(forestHost);
                    forestSplits.get((int)(forestSplits.size() - 1)).hostName = forestHost;
                } else {
                    forestSplits.get((int)(forestSplits.size() - 1)).hostName = this.localMode && forestHost.equals(this.localHost) ? inputHosts[0] : forestHost;
                }
            }
            ++count;
        }
        if (restrictHosts) {
            for (ForestSplit split : forestSplits) {
                split.hostName = rhUtil.getNextHost(split.hostName);
            }
        }
        while (result.hasNext()) {
            item = result.next();
            if (ItemType.XS_INTEGER == item.getItemType()) {
                if (((XSInteger)item.getItem()).asPrimitiveInt() == 0) break;
                throw new IOException("Unexpected item " + item.getItemType().toString());
            }
            if (ItemType.XS_STRING != item.getItemType()) {
                throw new IOException("Unexpected item " + item.getItemType().toString());
            }
            String itemStr = ((XSString)item.getItem()).asString();
            ruleUris.add(itemStr);
        }
        if (!restrictHosts) {
            ResultItem item2;
            String forest = "";
            String hostName = "";
            HashMap forestHostMap = new HashMap();
            while (result.hasNext() && (ItemType.XS_INTEGER != (item2 = result.next()).getItemType() || ((XSInteger)item2.getItem()).asPrimitiveInt() != 0)) {
                forest = item2.asString();
                if (!result.hasNext()) continue;
                item2 = result.next();
                hostName = item2.asString();
                ArrayList<ForestHost> replicas = new ArrayList<ForestHost>();
                String replicaForest = "";
                String replicaHost = "";
                while (result.hasNext() && (ItemType.XS_INTEGER != (item2 = result.next()).getItemType() || ((XSInteger)item2.getItem()).asPrimitiveInt() != 0)) {
                    replicaForest = item2.asString();
                    if (!result.hasNext()) continue;
                    item2 = result.next();
                    replicaHost = item2.asString();
                    if (this.localMode && replicaHost.equals(this.localHost)) {
                        replicaHost = inputHosts[0];
                    }
                    ForestHost info = new ForestHost(replicaForest, replicaHost);
                    replicas.add(info);
                }
                forestHostMap.put(forest, replicas);
            }
            for (ForestSplit split : forestSplits) {
                split.replicas = (List)forestHostMap.get(split.forestId.toString());
            }
        }
    }

    protected void appendCustom(StringBuilder buf) {
        buf.append(DEFAULT_CTS_QUERY);
    }

    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
        String splitQuery;
        this.jobConf = jobContext.getConfiguration();
        boolean advancedMode = this.jobConf.get("mapreduce.marklogic.input.mode", "basic").equals("advanced");
        boolean restrictHosts = this.jobConf.getBoolean("mapreduce.marklogic.input.restricthosts", false);
        String queryLanguage = null;
        String[] redactionRuleCol = this.jobConf.getStrings("mapreduce.marklogic.input.redaction.rules");
        String[] inputHosts = this.jobConf.getStrings("mapreduce.marklogic.input.host");
        if (inputHosts == null || inputHosts.length == 0) {
            throw new IllegalStateException("mapreduce.marklogic.input.host is not specified.");
        }
        if (advancedMode) {
            queryLanguage = this.jobConf.get("mapreduce.marklogic.input.querylanguage");
            splitQuery = this.jobConf.get("mapreduce.marklogic.input.splitquery");
        } else {
            StringBuilder buf = new StringBuilder();
            buf.append("xquery version \"1.0-ml\";\n");
            buf.append("import module namespace hadoop = ");
            buf.append("\"http://marklogic.com/xdmp/hadoop\" at ");
            buf.append("\"/MarkLogic/hadoop.xqy\";\n");
            if (redactionRuleCol != null) {
                buf.append("import module namespace rdt = \"http://marklogic.com/xdmp/redaction\" at \"/MarkLogic/redaction.xqy\";\n");
            }
            buf.append("xdmp:host-name(xdmp:host()),\n");
            buf.append("hadoop:get-splits('");
            this.appendNsBindings(buf);
            buf.append("', '");
            this.appendDocumentSelector(buf);
            buf.append("',");
            this.appendQuery(buf);
            buf.append("),\n");
            this.appendRedactionRuleValidateQuery(buf, redactionRuleCol);
            buf.append(",0,");
            if (!restrictHosts) {
                this.appendReplicaQuery(buf);
                buf.append(",0,");
            }
            this.appendCustom(buf);
            splitQuery = buf.toString();
        }
        String mode = this.jobConf.get("mapreduce.marklogic.mode", "distributed");
        long defaultSplitSize = mode.equals("distributed") ? 50000L : 20000L;
        long maxSplitSize = this.jobConf.getLong("mapreduce.marklogic.input.maxsplitsize", defaultSplitSize);
        if (maxSplitSize <= 0L) {
            throw new IllegalStateException("Max split size is required to be positive. It is set to " + maxSplitSize);
        }
        ArrayList<ForestSplit> forestSplits = new ArrayList<ForestSplit>();
        Session session = null;
        ResultSequence result = null;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Split query: " + splitQuery));
        }
        this.localMode = "local".equals(this.jobConf.get("mapreduce.marklogic.mode"));
        int hostIdx = 0;
        while (hostIdx < inputHosts.length) {
            try {
                ResultItem item;
                ContentSource cs = InternalUtilities.getInputContentSource(this.jobConf, inputHosts[hostIdx]);
                session = cs.newSession();
                RequestOptions options = new RequestOptions();
                options.setCacheResult(false);
                if (this.localMode && advancedMode) {
                    AdhocQuery hostQuery = session.newAdhocQuery("xquery version \"1.0-ml\";xdmp:host-name(xdmp:host())");
                    hostQuery.setOptions(options);
                    result = session.submitRequest((Request)hostQuery);
                    if (result.hasNext()) {
                        item = result.next();
                        this.localHost = item.asString();
                    }
                    if (result != null) {
                        result.close();
                    }
                }
                AdhocQuery query = session.newAdhocQuery(splitQuery);
                if (queryLanguage != null) {
                    InternalUtilities.checkQueryLanguage(queryLanguage);
                    options.setQueryLanguage(queryLanguage);
                }
                query.setOptions(options);
                result = session.submitRequest((Request)query);
                if (!advancedMode && result.hasNext()) {
                    item = result.next();
                    this.localHost = item.asString();
                }
                ArrayList<String> ruleUris = null;
                if (redactionRuleCol != null) {
                    ruleUris = new ArrayList<String>();
                }
                this.getForestSplits(jobContext, result, forestSplits, ruleUris, inputHosts);
                LOG.info((Object)("Fetched " + forestSplits.size() + " forest splits."));
                break;
            }
            catch (ServerConnectionException e) {
                LOG.warn((Object)("Unable to connect to " + inputHosts[hostIdx] + " to query source information"));
                ++hostIdx;
            }
            catch (XccConfigException e) {
                LOG.error((Object)e);
                throw new IOException(e);
            }
            catch (RequestException e) {
                LOG.error((Object)e);
                LOG.error((Object)("Query: " + splitQuery));
                throw new IOException(e);
            }
            finally {
                if (result != null) {
                    result.close();
                }
                if (session == null) continue;
                session.close();
            }
        }
        if (hostIdx == inputHosts.length) {
            throw new IOException("Unable to query source information, no usable hostname found");
        }
        if (forestSplits == null || forestSplits.isEmpty()) {
            return new ArrayList<InputSplit>();
        }
        HashMap hostForestSplits = new HashMap();
        boolean tsQuery = this.jobConf.get("mapreduce.marklogic.input.querytimestamp") != null;
        for (int i = 0; i < forestSplits.size(); ++i) {
            long remainingCount;
            ForestSplit fsplit = (ForestSplit)forestSplits.get(i);
            ArrayList<MarkLogicInputSplit> splits = null;
            if (fsplit.recordCount <= 0L && tsQuery) continue;
            String host = fsplit.hostName;
            List splitLists = (List)hostForestSplits.get(host);
            if (splitLists == null) {
                splitLists = new ArrayList();
                hostForestSplits.put(host, splitLists);
            }
            splits = new ArrayList<MarkLogicInputSplit>();
            splitLists.add(splits);
            long splitSize = maxSplitSize;
            if (this instanceof KeyValueInputFormat && (splitSize & 1L) != 0L) {
                --splitSize;
            }
            if ((remainingCount = fsplit.recordCount) <= 0L) {
                MarkLogicInputSplit split = new MarkLogicInputSplit(0L, 0L, fsplit.forestId, fsplit.hostName, fsplit.replicas);
                split.setLastSplit(true);
                splits.add(split);
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug((Object)("Added split " + (Object)((Object)split)));
                continue;
            }
            while (remainingCount > 0L) {
                MarkLogicInputSplit split;
                long start = fsplit.recordCount - remainingCount;
                if (remainingCount < splitSize) {
                    split = new MarkLogicInputSplit(start, remainingCount, fsplit.forestId, fsplit.hostName, fsplit.replicas);
                    split.setLastSplit(true);
                    remainingCount = 0L;
                } else {
                    split = new MarkLogicInputSplit(start, splitSize, fsplit.forestId, fsplit.hostName, fsplit.replicas);
                    remainingCount -= splitSize;
                }
                splits.add(split);
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug((Object)("Added split " + (Object)((Object)split)));
            }
        }
        Set hosts = hostForestSplits.keySet();
        int hostCount = hosts.size();
        List[] hostSplits = new List[hostCount];
        int i = 0;
        for (String host : hosts) {
            List splitLists = (List)hostForestSplits.get(host);
            if (splitLists.size() == 1) {
                hostSplits[i++] = (List)splitLists.get(0);
                continue;
            }
            hostSplits[i] = new ArrayList();
            boolean more = true;
            int j = 0;
            while (more) {
                more = false;
                for (List splitsPerForest : splitLists) {
                    if (j < splitsPerForest.size()) {
                        hostSplits[i].add(splitsPerForest.get(j));
                    }
                    more = more || j + 1 < splitsPerForest.size();
                }
                ++j;
            }
            ++i;
        }
        ArrayList<InputSplit> splitList = new ArrayList<InputSplit>();
        boolean more = true;
        int j = 0;
        while (more) {
            more = false;
            for (List splitsPerHost : hostSplits) {
                if (j < splitsPerHost.size()) {
                    splitList.add((InputSplit)splitsPerHost.get(j));
                }
                more = more || j + 1 < splitsPerHost.size();
            }
            ++j;
        }
        LOG.info((Object)("Made " + splitList.size() + " split(s)."));
        if (LOG.isDebugEnabled()) {
            for (InputSplit split : splitList) {
                LOG.debug((Object)split);
            }
        }
        return splitList;
    }

    protected class ForestSplit {
        BigInteger forestId;
        String hostName;
        long recordCount;
        List<ForestHost> replicas;

        protected ForestSplit() {
        }
    }
}

