/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.dataflow.spark.common.hmap;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.talend.bigdata.dataflow.functions.FlatMapperIterator;
import org.talend.bigdata.dataflow.hmap.AvroJoiner;
import org.talend.bigdata.dataflow.hmap.HMapSpec;
import org.talend.bigdata.dataflow.keys.IndexedRecordGetter;
import scala.Tuple2;

public class JoinOutputFunction<JOIN_KEY, INPUT_MAIN, INPUT_LOOKUP, OUTPUT>
implements PairFlatMapFunction<Iterator<Tuple2<JOIN_KEY, Tuple2<Iterable<INPUT_MAIN>, Iterable<INPUT_LOOKUP>>>>, JOIN_KEY, OUTPUT> {
    private static final long serialVersionUID = 1L;
    private final HMapSpec mSpec;
    private JoinInputAdder<INPUT_MAIN> mInputMain;
    private JoinInputAdder<INPUT_LOOKUP> mInputLookup;
    private final IndexedRecordGetter<JOIN_KEY> mFetcherForPullKeyExpression;
    private AvroJoiner.AvroJoinEmitter<Object> mOutputEmitter;
    private transient AvroJoiner<Object> mJoiner;
    private transient JOIN_KEY mJoinKey;
    private final List<Tuple2<JOIN_KEY, OUTPUT>> mOutput = new ArrayList<Tuple2<JOIN_KEY, OUTPUT>>();

    public JoinOutputFunction(HMapSpec spec, JoinInputAdder<INPUT_MAIN> inputMain, JoinInputAdder<INPUT_LOOKUP> inputLookup, IndexedRecordGetter<JOIN_KEY> pullKeyFetcher, OutEmitter<JOIN_KEY, OUTPUT> outputEmitter) {
        this.mSpec = spec;
        inputMain.mParent = this;
        this.mInputMain = inputMain;
        inputLookup.mParent = this;
        this.mInputLookup = inputLookup;
        this.mFetcherForPullKeyExpression = pullKeyFetcher;
        outputEmitter.mParent = this;
        this.mOutputEmitter = outputEmitter;
    }

    public Iterable<Tuple2<JOIN_KEY, OUTPUT>> call(final Iterator<Tuple2<JOIN_KEY, Tuple2<Iterable<INPUT_MAIN>, Iterable<INPUT_LOOKUP>>>> iterator) throws Exception {
        return new Iterable<Tuple2<JOIN_KEY, OUTPUT>>(){

            @Override
            public Iterator<Tuple2<JOIN_KEY, OUTPUT>> iterator() {
                return new JoinOutputIterator(iterator);
            }
        };
    }

    public static class OutNEmitter<JOIN_KEY>
    extends OutEmitter<JOIN_KEY, Tuple2<Byte, IndexedRecord>> {
        private static final long serialVersionUID = 1L;

        @Override
        public void emit(Object context, String outTag, IndexedRecord r) {
            this.mParent.mOutput.add(new Tuple2(this.mParent.mJoinKey, (Object)new Tuple2((Object)((byte)this.mParent.mSpec.getOutputOrder(outTag).intValue()), (Object)r)));
        }
    }

    public static class Out1Emitter<JOIN_KEY>
    extends OutEmitter<JOIN_KEY, IndexedRecord> {
        private static final long serialVersionUID = 1L;

        @Override
        public void emit(Object context, String outTag, IndexedRecord r) {
            this.mParent.mOutput.add(new Tuple2(this.mParent.mJoinKey, (Object)r));
        }
    }

    public static abstract class OutEmitter<JOIN_KEY, OUTPUT>
    implements AvroJoiner.AvroJoinEmitter<Object>,
    Serializable {
        private static final long serialVersionUID = 1L;
        protected JoinOutputFunction<JOIN_KEY, ?, ?, OUTPUT> mParent;
    }

    public static class InLookupNAdder
    extends JoinInputAdder<Tuple2<Byte, IndexedRecord>> {
        private static final long serialVersionUID = 1L;

        @Override
        public void add(Tuple2<Byte, IndexedRecord> lookup) {
            this.mParent.mJoiner.addLookup(this.mParent.mSpec.getInput(((Byte)lookup._1()).byteValue()), (IndexedRecord)lookup._2());
        }
    }

    public static class InLookup1Adder
    extends JoinInputAdder<IndexedRecord> {
        private static final long serialVersionUID = 1L;
        private transient HMapSpec.InputDef mIn;

        @Override
        public void add(IndexedRecord lookup) {
            if (this.mIn == null) {
                this.mIn = this.mParent.mSpec.getFirstInputLookup();
            }
            this.mParent.mJoiner.addLookup(this.mIn, lookup);
        }
    }

    public static class InMainNAdder
    extends JoinInputAdder<Tuple2<Byte, IndexedRecord>> {
        private static final long serialVersionUID = 1L;

        @Override
        public void add(Tuple2<Byte, IndexedRecord> main) {
            this.mParent.mJoiner.transformMain(this.mParent, this.mParent.mSpec.getInput(((Byte)main._1()).byteValue()), (IndexedRecord)main._2());
        }
    }

    public static class InMain1Adder
    extends JoinInputAdder<IndexedRecord> {
        private static final long serialVersionUID = 1L;
        private transient HMapSpec.InputDef mIn;

        @Override
        public void add(IndexedRecord main) {
            if (this.mIn == null) {
                this.mIn = this.mParent.mSpec.getFirstInputMain();
            }
            this.mParent.mJoiner.transformMain(this.mParent, this.mIn, main);
        }
    }

    public static abstract class JoinInputAdder<INPUT>
    implements Serializable {
        private static final long serialVersionUID = 1L;
        protected JoinOutputFunction<?, ?, ?, ?> mParent;

        abstract void add(INPUT var1);
    }

    private final class JoinOutputIterator
    extends FlatMapperIterator<Tuple2<JOIN_KEY, Tuple2<Iterable<INPUT_MAIN>, Iterable<INPUT_LOOKUP>>>, Tuple2<JOIN_KEY, OUTPUT>> {
        private static final long serialVersionUID = 1L;
        private boolean mPreparedPullSources;

        public JoinOutputIterator(Iterator<Tuple2<JOIN_KEY, Tuple2<Iterable<INPUT_MAIN>, Iterable<INPUT_LOOKUP>>>> iterator) {
            super(iterator);
            this.mPreparedPullSources = false;
        }

        @Override
        public void prepare() throws Exception {
            JoinOutputFunction.this.mJoiner = new AvroJoiner(JoinOutputFunction.this.mSpec);
            for (HMapSpec.OutputDef out : JoinOutputFunction.this.mSpec.getOutputs()) {
                JoinOutputFunction.this.mJoiner.addEmitter(out, JoinOutputFunction.this.mOutputEmitter);
            }
        }

        @Override
        public Iterable<Tuple2<JOIN_KEY, OUTPUT>> flatMap(final Tuple2<JOIN_KEY, Tuple2<Iterable<INPUT_MAIN>, Iterable<INPUT_LOOKUP>>> next) throws Exception {
            return new Iterable<Tuple2<JOIN_KEY, OUTPUT>>(){

                @Override
                public Iterator<Tuple2<JOIN_KEY, OUTPUT>> iterator() {
                    Iterable main = (Iterable)((Tuple2)next._2())._1();
                    if (main.iterator().hasNext()) {
                        return new JoinOutputInnerIterator(next._1(), (Iterable)((Tuple2)next._2())._1(), (Iterable)((Tuple2)next._2())._2());
                    }
                    return Collections.emptyList().iterator();
                }
            };
        }

        @Override
        public void cleanup() throws Exception {
            if (this.mPreparedPullSources) {
                for (HMapSpec.InputDef pull : JoinOutputFunction.this.mSpec.getInputsPull()) {
                    pull.getPullSource().cleanup();
                }
                this.mPreparedPullSources = false;
            }
        }

        private final class JoinOutputInnerIterator
        extends FlatMapperIterator<INPUT_MAIN, Tuple2<JOIN_KEY, OUTPUT>> {
            private static final long serialVersionUID = 1L;
            private final Iterable<INPUT_LOOKUP> mLookups;

            public JoinOutputInnerIterator(JOIN_KEY joinKey, Iterable<INPUT_MAIN> mains, Iterable<INPUT_LOOKUP> lookups) {
                super(mains.iterator());
                JoinOutputFunction.this.mJoinKey = joinKey;
                this.mLookups = lookups;
            }

            @Override
            public void prepare() throws Exception {
                JoinOutputFunction.this.mJoiner.reset();
                if (!JoinOutputIterator.this.mPreparedPullSources) {
                    for (HMapSpec.InputDef pull : JoinOutputFunction.this.mSpec.getInputsPull()) {
                        pull.getPullSource().prepare();
                    }
                    JoinOutputIterator.this.mPreparedPullSources = true;
                }
                for (HMapSpec.InputDef pull : JoinOutputFunction.this.mSpec.getInputsPull()) {
                    for (IndexedRecord indexedRecord : pull.getPullSource().flatMap((IndexedRecord)JoinOutputFunction.this.mFetcherForPullKeyExpression.get(JoinOutputFunction.this.mJoinKey))) {
                        if (!pull.isKeptAfterFilters(indexedRecord)) continue;
                        JoinOutputFunction.this.mJoiner.addLookup(pull, indexedRecord);
                    }
                }
                for (HMapSpec.InputDef lookup : this.mLookups) {
                    JoinOutputFunction.this.mInputLookup.add(lookup);
                }
            }

            @Override
            public Iterable<Tuple2<JOIN_KEY, OUTPUT>> flatMap(INPUT_MAIN next) throws Exception {
                JoinOutputFunction.this.mOutput.clear();
                JoinOutputFunction.this.mInputMain.add(next);
                return JoinOutputFunction.this.mOutput;
            }

            @Override
            public void cleanup() throws Exception {
            }
        }
    }
}

