/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Utils;

public class LimitAdjuster
extends MROpPlanVisitor {
    ArrayList<MapReduceOper> opsToAdjust = new ArrayList();
    PigContext pigContext;
    NodeIdGenerator nig;
    private String scope;

    public LimitAdjuster(MROperPlan plan, PigContext pigContext) {
        super(plan, (PlanWalker<MapReduceOper, MROperPlan>)new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
        this.pigContext = pigContext;
        this.nig = NodeIdGenerator.getGenerator();
        List roots = plan.getRoots();
        this.scope = ((MapReduceOper)roots.get(0)).getOperatorKey().getScope();
    }

    @Override
    public void visitMROp(MapReduceOper mr) throws VisitorException {
        if (mr.limit != -1L || mr.limitPlan != null) {
            this.opsToAdjust.add(mr);
        }
    }

    public void adjust() throws IOException, PlanException {
        for (MapReduceOper mr : this.opsToAdjust) {
            if (mr.reducePlan.isEmpty()) continue;
            List mpLeaves = mr.reducePlan.getLeaves();
            if (mpLeaves.size() != 1) {
                int errCode = 2024;
                String msg = "Expected reduce to have single leaf. Found " + mpLeaves.size() + " leaves.";
                throw new MRCompilerException(msg, errCode, 4);
            }
            PhysicalOperator mpLeaf = (PhysicalOperator)mpLeaves.get(0);
            if (!this.pigContext.inIllustrator && !(mpLeaf instanceof POStore)) {
                int errCode = 2025;
                String msg = "Expected leaf of reduce plan to always be POStore. Found " + mpLeaf.getClass().getSimpleName();
                throw new MRCompilerException(msg, errCode, 4);
            }
            FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
            boolean oldIsTmpStore = ((POStore)mpLeaf).isTmpStore();
            FileSpec fSpec = new FileSpec(FileLocalizer.getTemporaryPath(this.pigContext).toString(), new FuncSpec(Utils.getTmpFileCompressorName(this.pigContext)));
            POStore storeOp = (POStore)mpLeaf;
            storeOp.setSFile(fSpec);
            storeOp.setIsTmpStore(true);
            mr.setReduceDone(true);
            MapReduceOper limitAdjustMROp = new MapReduceOper(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
            POLoad ld = new POLoad(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
            ld.setPc(this.pigContext);
            ld.setLFile(fSpec);
            limitAdjustMROp.mapPlan.add(ld);
            if (mr.isGlobalSort()) {
                this.connectMapToReduceLimitedSort(limitAdjustMROp, mr);
            } else {
                MRUtil.simpleConnectMapToReduce(limitAdjustMROp, this.scope, this.nig);
            }
            this.splitReducerForLimit(limitAdjustMROp, mr);
            if (mr.isGlobalSort()) {
                limitAdjustMROp.setLimitAfterSort(true);
                limitAdjustMROp.setSortOrder(mr.getSortOrder());
            }
            POStore st = new POStore(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
            st.setSFile(oldSpec);
            st.setIsTmpStore(oldIsTmpStore);
            st.setSchema(((POStore)mpLeaf).getSchema());
            st.setSignature(((POStore)mpLeaf).getSignature());
            limitAdjustMROp.reducePlan.addAsLeaf(st);
            limitAdjustMROp.requestedParallelism = 1;
            limitAdjustMROp.setLimitOnly(true);
            List<MapReduceOper> successorList = ((MROperPlan)this.mPlan).getSuccessors(mr);
            MapReduceOper[] successors = null;
            if (successorList != null && successorList.size() > 0) {
                successors = new MapReduceOper[successorList.size()];
                int i = 0;
                for (MapReduceOper op : successorList) {
                    successors[i++] = op;
                }
            }
            for (String udf : mr.UDFs) {
                if (limitAdjustMROp.UDFs.contains(udf)) continue;
                limitAdjustMROp.UDFs.add(udf);
            }
            ((MROperPlan)this.mPlan).add(limitAdjustMROp);
            ((MROperPlan)this.mPlan).connect(mr, limitAdjustMROp);
            if (successors == null) continue;
            for (int i = 0; i < successors.length; ++i) {
                MapReduceOper nextMr = successors[i];
                if (nextMr != null) {
                    ((MROperPlan)this.mPlan).disconnect(mr, nextMr);
                }
                if (nextMr == null) continue;
                ((MROperPlan)this.mPlan).connect(limitAdjustMROp, nextMr);
            }
        }
    }

    private void splitReducerForLimit(MapReduceOper secondMROp, MapReduceOper firstMROp) throws PlanException, VisitorException {
        List<PhysicalOperator> succs;
        PhysicalOperator op = (PhysicalOperator)firstMROp.reducePlan.getRoots().get(0);
        assert (op instanceof POPackage);
        while ((succs = firstMROp.reducePlan.getSuccessors(op)) != null) {
            op = succs.get(0);
            if (!(op instanceof POLimit)) continue;
            op = firstMROp.reducePlan.getSuccessors(op).get(0);
            break;
        }
        POLimit pLimit2 = new POLimit(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
        pLimit2.setLimit(firstMROp.limit);
        pLimit2.setLimitPlan(firstMROp.limitPlan);
        secondMROp.reducePlan.addAsLeaf(pLimit2);
        while (!(op instanceof POStore)) {
            PhysicalOperator opToMove = op;
            List<PhysicalOperator> succs2 = firstMROp.reducePlan.getSuccessors(op);
            op = succs2.get(0);
            firstMROp.reducePlan.removeAndReconnect(opToMove);
            secondMROp.reducePlan.addAsLeaf(opToMove);
        }
    }

    private void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper sortMROp) throws PlanException, VisitorException {
        POLocalRearrange slr = (POLocalRearrange)sortMROp.mapPlan.getLeaves().get(0);
        POLocalRearrange lr = null;
        try {
            lr = slr.clone();
        }
        catch (CloneNotSupportedException e) {
            int errCode = 2147;
            String msg = "Error cloning POLocalRearrange for limit after sort";
            throw new MRCompilerException(msg, errCode, 4, (Throwable)e);
        }
        mro.mapPlan.addAsLeaf(lr);
        POPackage spkg = (POPackage)sortMROp.reducePlan.getRoots().get(0);
        POPackage pkg = null;
        try {
            pkg = spkg.clone();
        }
        catch (Exception e) {
            int errCode = 2148;
            String msg = "Error cloning POPackageLite for limit after sort";
            throw new MRCompilerException(msg, errCode, 4, (Throwable)e);
        }
        mro.reducePlan.add(pkg);
        mro.reducePlan.addAsLeaf(MRUtil.getPlainForEachOP(this.scope, this.nig));
    }
}

