/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.master;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;

@InterfaceAudience.Private
public class ClusterStatusPublisher
extends Chore {
    public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
    public static final Class<? extends Publisher> DEFAULT_STATUS_PUBLISHER_CLASS = null;
    public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
    public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
    private long lastMessageTime = 0L;
    private final HMaster master;
    private final int messagePeriod;
    private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<ServerName, Integer>();
    private Publisher publisher;
    private boolean connected = false;
    public static int MAX_SERVER_PER_MESSAGE = 10;
    public static int NB_SEND = 5;

    public ClusterStatusPublisher(HMaster master, Configuration conf, Class<? extends Publisher> publisherClass) throws IOException {
        super("HBase clusterStatusPublisher for " + master.getName(), conf.getInt(STATUS_PUBLISH_PERIOD, 10000), (Stoppable)master);
        this.master = master;
        this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, 10000);
        try {
            this.publisher = publisherClass.newInstance();
        }
        catch (InstantiationException e) {
            throw new IOException("Can't create publisher " + publisherClass.getName(), e);
        }
        catch (IllegalAccessException e) {
            throw new IOException("Can't create publisher " + publisherClass.getName(), e);
        }
        this.publisher.connect(conf);
        this.connected = true;
    }

    protected ClusterStatusPublisher() {
        this.master = null;
        this.messagePeriod = 0;
    }

    protected void chore() {
        if (!this.connected) {
            return;
        }
        List<ServerName> sns = this.generateDeadServersListToSend();
        if (sns.isEmpty()) {
            return;
        }
        long curTime = EnvironmentEdgeManager.currentTimeMillis();
        if (this.lastMessageTime > curTime - (long)this.messagePeriod) {
            return;
        }
        this.lastMessageTime = curTime;
        ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(), this.master.getMasterFileSystem().getClusterId().toString(), null, sns, this.master.getServerName(), null, null, null, null);
        this.publisher.publish(cs);
    }

    protected void cleanup() {
        this.connected = false;
        this.publisher.close();
    }

    protected List<ServerName> generateDeadServersListToSend() {
        long since = EnvironmentEdgeManager.currentTimeMillis() - (long)(this.messagePeriod * 2);
        for (Pair<ServerName, Long> dead : this.getDeadServers(since)) {
            this.lastSent.putIfAbsent((ServerName)dead.getFirst(), 0);
        }
        ArrayList entries = new ArrayList();
        entries.addAll(this.lastSent.entrySet());
        Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>(){

            @Override
            public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
                return o1.getValue().compareTo(o2.getValue());
            }
        });
        int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
        ArrayList<ServerName> res = new ArrayList<ServerName>(max);
        for (int i = 0; i < max; ++i) {
            Map.Entry toSend = (Map.Entry)entries.get(i);
            if ((Integer)toSend.getValue() >= NB_SEND - 1) {
                this.lastSent.remove(toSend.getKey());
            } else {
                this.lastSent.replace((ServerName)toSend.getKey(), (Integer)toSend.getValue(), (Integer)toSend.getValue() + 1);
            }
            res.add((ServerName)toSend.getKey());
        }
        return res;
    }

    protected List<Pair<ServerName, Long>> getDeadServers(long since) {
        if (this.master.getServerManager() == null) {
            return Collections.emptyList();
        }
        return this.master.getServerManager().getDeadServers().copyDeadServersSince(since);
    }

    public static class MulticastPublisher
    implements Publisher {
        private DatagramChannel channel;
        private final ExecutorService service = Executors.newSingleThreadExecutor(Threads.newDaemonThreadFactory((String)"hbase-master-clusterStatus-worker"));

        @Override
        public void connect(Configuration conf) throws IOException {
            InetAddress ina;
            String mcAddress = conf.get("hbase.status.multicast.address.ip", "226.1.1.3");
            int port = conf.getInt("hbase.status.multicast.port", 60100);
            OioDatagramChannelFactory f = new OioDatagramChannelFactory((Executor)this.service);
            ConnectionlessBootstrap b = new ConnectionlessBootstrap((ChannelFactory)f);
            b.setPipeline(Channels.pipeline((ChannelHandler[])new ChannelHandler[]{new ProtobufEncoder(), new ChannelUpstreamHandler(){

                public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
                }
            }}));
            this.channel = (DatagramChannel)b.bind((SocketAddress)new InetSocketAddress(0));
            this.channel.getConfig().setReuseAddress(true);
            try {
                ina = InetAddress.getByName(mcAddress);
            }
            catch (UnknownHostException e) {
                throw new IOException("Can't connect to " + mcAddress, e);
            }
            this.channel.joinGroup(ina);
            this.channel.connect((SocketAddress)new InetSocketAddress(mcAddress, port));
        }

        @Override
        public void publish(ClusterStatus cs) {
            ClusterStatusProtos.ClusterStatus csp = cs.convert();
            this.channel.write((Object)csp);
        }

        @Override
        public void close() {
            if (this.channel != null) {
                this.channel.close();
            }
            this.service.shutdown();
        }
    }

    public static interface Publisher
    extends Closeable {
        public void connect(Configuration var1) throws IOException;

        public void publish(ClusterStatus var1);

        @Override
        public void close();
    }
}

