/*
 * Decompiled with CFR 0.152.
 */
package org.talend.dataprep.transformation.pipeline;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.dataprep.api.action.ActionDefinition;
import org.talend.dataprep.api.dataset.DataSet;
import org.talend.dataprep.api.dataset.RowMetadata;
import org.talend.dataprep.api.dataset.row.DataSetRow;
import org.talend.dataprep.api.preparation.PreparationMessage;
import org.talend.dataprep.api.preparation.Step;
import org.talend.dataprep.dataset.StatisticsAdapter;
import org.talend.dataprep.quality.AnalyzerService;
import org.talend.dataprep.transformation.actions.category.ScopeCategory;
import org.talend.dataprep.transformation.actions.common.ApplyDataSetRowAction;
import org.talend.dataprep.transformation.actions.common.CompileDataSetRowAction;
import org.talend.dataprep.transformation.actions.common.ImplicitParameters;
import org.talend.dataprep.transformation.actions.common.RunnableAction;
import org.talend.dataprep.transformation.pipeline.ActionRegistry;
import org.talend.dataprep.transformation.pipeline.Link;
import org.talend.dataprep.transformation.pipeline.Node;
import org.talend.dataprep.transformation.pipeline.PipelineConsoleDump;
import org.talend.dataprep.transformation.pipeline.RuntimeNode;
import org.talend.dataprep.transformation.pipeline.Signal;
import org.talend.dataprep.transformation.pipeline.StepNodeTransformer;
import org.talend.dataprep.transformation.pipeline.Visitor;
import org.talend.dataprep.transformation.pipeline.builder.ActionNodesBuilder;
import org.talend.dataprep.transformation.pipeline.builder.NodeBuilder;
import org.talend.dataprep.transformation.pipeline.node.BasicNode;
import org.talend.dataprep.transformation.pipeline.node.FilteredNode;
import org.talend.dataprep.transformation.pipeline.node.LimitNode;

public class Pipeline
implements Node,
RuntimeNode,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
    private static final long serialVersionUID = 1L;
    private Node node;
    private final AtomicBoolean isStopped = new AtomicBoolean();
    private final transient Object isFinished = new Object();

    public Pipeline() {
    }

    public Pipeline(Node node) {
        this.node = node;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(DataSet dataSet) {
        RowMetadata rowMetadata = dataSet.getMetadata().getRowMetadata().clone();
        try (Stream<DataSetRow> records = dataSet.getRecords();){
            Object object = this.isFinished;
            synchronized (object) {
                AtomicLong counter = new AtomicLong();
                records.peek(row -> {
                    this.node.exec().receive((DataSetRow)row, rowMetadata);
                    counter.addAndGet(1L);
                }).allMatch(row -> !this.isStopped.get());
                LOG.debug("{} rows sent in the pipeline", (Object)counter.get());
                this.node.exec().signal(Signal.END_OF_STREAM);
            }
        }
    }

    public void setNode(Node node) {
        this.node = node;
    }

    public String toString() {
        StringBuilder builder = new StringBuilder();
        this.accept(new PipelineConsoleDump(builder));
        return builder.toString();
    }

    @Override
    public void receive(DataSetRow row, RowMetadata metadata) {
        this.node.exec().receive(row, metadata);
    }

    @Override
    public void receive(DataSetRow[] rows, RowMetadata[] metadatas) {
        throw new UnsupportedOperationException("Pipeline only manage single rows as input");
    }

    @Override
    public Link getLink() {
        return this.node.getLink();
    }

    @Override
    public void setLink(Link link) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void signal(Signal signal) {
        if (signal == Signal.STOP) {
            this.isStopped.set(true);
            this.waitForPipelineToFinish();
        } else if (signal == Signal.CANCEL) {
            this.isStopped.set(true);
            this.node.exec().signal(signal);
        } else {
            this.node.exec().signal(signal);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForPipelineToFinish() {
        Object object = this.isFinished;
        synchronized (object) {
            LOG.debug("pipeline is finished");
        }
    }

    @Override
    public void accept(Visitor visitor) {
        visitor.visitPipeline(this);
    }

    @Override
    public RuntimeNode exec() {
        return this;
    }

    @Override
    public Node copyShallow() {
        return new Pipeline(this.node);
    }

    public Node getNode() {
        return this.node;
    }

    public static class Builder {
        private final List<RunnableAction> actions = new ArrayList<RunnableAction>();
        private RowMetadata rowMetadata;
        private boolean completeMetadata;
        private ActionRegistry actionRegistry;
        private StatisticsAdapter adapter;
        private Supplier<Node> monitorSupplier = BasicNode::new;
        private Supplier<Node> outputSupplier = null;
        private boolean allowMetadataChange = true;
        private Predicate<DataSetRow> inFilter;
        private Function<RowMetadata, Predicate<DataSetRow>> outFilter;
        private boolean needGlobalStatistics = true;
        private AnalyzerService analyzerService;
        private PreparationMessage preparation;
        private Function<Step, RowMetadata> previousStepRowMetadataSupplier = s -> null;
        private Long limit = null;

        public static Builder builder() {
            return new Builder();
        }

        public Builder withStepMetadataSupplier(Function<Step, RowMetadata> previousStepRowMetadataSupplier) {
            this.previousStepRowMetadataSupplier = previousStepRowMetadataSupplier;
            return this;
        }

        public Builder withAnalyzerService(AnalyzerService analyzerService) {
            this.analyzerService = analyzerService;
            return this;
        }

        public Builder withStatisticsAdapter(StatisticsAdapter adapter) {
            this.adapter = adapter;
            return this;
        }

        public Builder withActionRegistry(ActionRegistry actionRegistry) {
            this.actionRegistry = actionRegistry;
            return this;
        }

        public Builder withInitialMetadata(RowMetadata rowMetadata, boolean completeMetadata) {
            this.rowMetadata = rowMetadata;
            this.completeMetadata = completeMetadata;
            return this;
        }

        public Builder withPreparation(PreparationMessage preparation) {
            this.preparation = preparation;
            return this;
        }

        public Builder withActions(List<RunnableAction> actions) {
            this.actions.addAll(actions);
            return this;
        }

        public Builder withMonitor(Supplier<Node> monitorSupplier) {
            this.monitorSupplier = monitorSupplier;
            return this;
        }

        public Builder withOutput(Supplier<Node> outputSupplier) {
            this.outputSupplier = outputSupplier;
            return this;
        }

        public Builder withGlobalStatistics(boolean needGlobalStatistics) {
            this.needGlobalStatistics = needGlobalStatistics;
            return this;
        }

        public Builder allowMetadataChange(boolean allowMetadataChange) {
            this.allowMetadataChange = allowMetadataChange;
            return this;
        }

        public Builder withFilter(Predicate<DataSetRow> filter) {
            this.inFilter = filter;
            return this;
        }

        public Builder withFilterOut(Function<RowMetadata, Predicate<DataSetRow>> outFilter) {
            this.outFilter = outFilter;
            return this;
        }

        public Builder withLimit(Long limit) {
            this.limit = limit;
            return this;
        }

        public Pipeline build() {
            List<RunnableAction> runnableActions;
            NodeBuilder current = this.inFilter != null ? NodeBuilder.filteredSource(this.inFilter) : NodeBuilder.source();
            if (this.preparation != null) {
                LOG.debug("Running using preparation #{} ({} step(s))", (Object)this.preparation.getId(), (Object)this.preparation.getSteps().size());
                runnableActions = this.actions.stream().map(a -> {
                    Map<String, String> parameters = a.getParameters();
                    ScopeCategory scope = ScopeCategory.from(parameters.get(ImplicitParameters.SCOPE.getKey()));
                    ActionDefinition actionDefinition = this.actionRegistry.get(a.getName());
                    CompileDataSetRowAction compile = new CompileDataSetRowAction(parameters, actionDefinition, scope);
                    ApplyDataSetRowAction apply = new ApplyDataSetRowAction(actionDefinition, parameters, scope);
                    return RunnableAction.Builder.builder().withCompile(compile).withRow(apply).withName(a.getName()).withParameters(new HashMap<String, String>(parameters)).build();
                }).collect(Collectors.toList());
            } else {
                LOG.debug("Running using submitted actions ({} action(s))", (Object)this.actions.size());
                runnableActions = this.actions;
            }
            Node actionsNode = ActionNodesBuilder.builder().initialMetadata(this.rowMetadata).actions(runnableActions).needStatisticsBefore(!this.completeMetadata).needStatisticsAfter(this.needGlobalStatistics).allowSchemaAnalysis(this.allowMetadataChange).actionRegistry(this.actionRegistry).analyzerService(this.analyzerService).statisticsAdapter(this.adapter).build();
            if (this.preparation != null) {
                LOG.debug("Applying step node transformations...");
                actionsNode.logStatus(LOG, "Before transformation\n{}");
                Node node = StepNodeTransformer.transform(actionsNode, this.preparation.getSteps(), this.previousStepRowMetadataSupplier);
                current.to(node);
                node.logStatus(LOG, "After transformation\n{}");
            } else {
                current.to(actionsNode);
            }
            if (this.outFilter != null) {
                current.to(new FilteredNode(this.outFilter));
            }
            Pipeline pipeline = new Pipeline();
            if (this.limit != null && this.limit > 0L) {
                current.to(new LimitNode(this.limit, () -> pipeline.signal(Signal.STOP)));
            }
            current.to(this.outputSupplier.get());
            current.to(this.monitorSupplier.get());
            pipeline.setNode(current.build());
            return pipeline;
        }
    }
}

