/*
 * Decompiled with CFR 0.152.
 */
package org.talend.libs.tbd.ee.libstorm;

import backtype.storm.Config;
import backtype.storm.ILocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.DistributedRPC;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.NotAliveException;
import backtype.storm.utils.DRPCClient;
import backtype.storm.utils.NimbusClient;
import java.io.PrintStream;
import java.util.Map;
import org.apache.thrift7.TException;
import org.json.simple.JSONValue;
import org.talend.libs.tbd.ee.libstorm.MalformedJarException;
import org.talend.libs.tbd.ee.libstorm.StormJobRunHelper;
import org.talend.libs.tbd.ee.libstorm.StormTopologyContext;

public class ClusterStormJobRunHelper
extends StormJobRunHelper {
    private String nimbusHost = null;
    private Integer nimbusThriftPort = 6227;
    private String drpcServer = null;
    private Integer drpcPort = 3772;
    private Integer drpcTimeout = 30000;
    private String jarToSubmit = null;
    private ClusterTopologyContext ctx = null;
    private static final Integer MAX_SUBMISSION_TRIES = 3;

    @Override
    public void inspect(PrintStream out) {
        System.out.println("== inspect start ==");
        System.out.println("{");
        System.out.println("  \"NIMBUSHOST\": \"" + this.nimbusHost + "\",");
        System.out.println("  \"NIMBUSPORT\": \"" + this.nimbusThriftPort + "\",");
        System.out.println("  \"DRPCHOST\": \"" + this.drpcServer + "\",");
        System.out.println("  \"DRPCPORT\": \"" + this.drpcPort + "\",");
        System.out.println("  \"LOCALMODE\": \"false\",");
        System.out.println("  \"STORM_TOPOLOGY_NAME\": \"" + this.getTopologyName() + "\"");
        System.out.println("}");
        System.out.println("== inspect end ==");
    }

    @Override
    public ClusterTopologyContext getTopologyContext() {
        if (this.ctx == null) {
            this.ctx = new ClusterTopologyContext();
            Config stormConfig = this.ctx.getStormConfig();
            stormConfig.put((Object)"storm.thrift.transport", (Object)"backtype.storm.security.auth.SimpleTransportPlugin");
        }
        return this.ctx;
    }

    @Override
    public int submitJob() throws Exception {
        int status = 0;
        Config stormConfig = this.ctx.getStormConfig();
        String uploadedJarLocation = StormSubmitter.submitJar((Map)stormConfig, (String)this.jarToSubmit);
        int i = -1;
        while (++i < MAX_SUBMISSION_TRIES) {
            try {
                status = 1;
                String jsonConf = JSONValue.toJSONString((Object)stormConfig);
                this.getTopologyContext().getNimbusClient().submitTopology(this.getTopologyName(), uploadedJarLocation, jsonConf, this.ctx.getTridentTopology().build());
                status = 0;
                break;
            }
            catch (AlreadyAliveException aee) {
                if (!this.getDoKillAlreadyExisting()) {
                    throw aee;
                }
                this.killExistingTopology();
                Thread.sleep(1000L);
            }
        }
        return status;
    }

    @Override
    public void killExistingTopology() {
        try {
            KillOptions killOptions = new KillOptions();
            killOptions.set_wait_secs(0);
            this.getTopologyContext().getNimbusClient().killTopologyWithOpts(this.getTopologyName(), killOptions);
        }
        catch (NotAliveException e) {
        }
        catch (TException e) {
            e.printStackTrace();
        }
    }

    public void setJarToSubmit(String jarToSubmit) throws MalformedJarException {
        if (!jarToSubmit.endsWith(".jar")) {
            throw new MalformedJarException();
        }
        this.jarToSubmit = jarToSubmit;
    }

    public void setNimbusHost(String nimbusHost) {
        this.nimbusHost = nimbusHost;
        this.getStormConfig().put((Object)"nimbus.host", (Object)nimbusHost);
    }

    public void setNimbusThriftPort(Integer nimbusThriftPort) {
        this.nimbusThriftPort = nimbusThriftPort;
        this.getStormConfig().put((Object)"nimbus.thrift.port", (Object)nimbusThriftPort);
    }

    public void setDrpcServer(String drpcServer) {
        this.drpcServer = drpcServer;
    }

    public void setDrpcPort(Integer drpcPort) {
        this.drpcPort = drpcPort;
    }

    public void setDrpcTimeout(Integer drpcTimeout) {
        this.drpcTimeout = drpcTimeout;
    }

    final class ClusterTopologyContext
    extends StormTopologyContext {
        private NimbusClient nimbusClient = null;
        private DRPCClient drpcClient = null;

        ClusterTopologyContext() {
        }

        @Override
        public final DistributedRPC.Iface getDrpcServer() {
            if (this.drpcClient == null) {
                this.drpcClient = new DRPCClient(ClusterStormJobRunHelper.this.drpcServer, ClusterStormJobRunHelper.this.drpcPort.intValue(), ClusterStormJobRunHelper.this.drpcTimeout);
            }
            return this.drpcClient;
        }

        @Override
        public final ILocalDRPC getLocalDrpcServer() {
            return null;
        }

        public final Nimbus.Client getNimbusClient() {
            if (this.nimbusClient == null) {
                this.nimbusClient = NimbusClient.getConfiguredClient((Map)this.getStormConfig());
            }
            return this.nimbusClient.getClient();
        }
    }
}

