/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.metadata.events;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.events.EventConsumer;
import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationEventPoll {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationEventPoll.class);
    private static final AtomicBoolean inited = new AtomicBoolean(false);
    private static NotificationEventPoll instance;
    Configuration conf;
    ScheduledExecutorService executorService;
    List<EventConsumer> eventConsumers = new ArrayList<EventConsumer>();
    ScheduledFuture<?> pollFuture;
    long lastCheckedEventId;

    public static void initialize(Configuration conf) throws Exception {
        if (!inited.getAndSet(true)) {
            try {
                instance = new NotificationEventPoll(conf);
            }
            catch (Exception err) {
                inited.set(false);
                throw err;
            }
        }
    }

    public static void shutdown() {
        if (inited.get()) {
            instance.stop();
            instance = null;
            inited.set(false);
        }
    }

    private NotificationEventPoll(Configuration conf) throws Exception {
        this.conf = conf;
        long pollInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_POLL_INTERVAL, TimeUnit.MILLISECONDS);
        if (pollInterval <= 0L) {
            LOG.debug("Non-positive poll interval configured, notification event polling disabled");
            return;
        }
        String[] consumerClassNames = conf.getStrings(HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_CONSUMERS.varname);
        if (consumerClassNames != null && consumerClassNames.length > 0) {
            for (String consumerClassName : consumerClassNames) {
                Class consumerClass = JavaUtils.loadClass(consumerClassName);
                EventConsumer consumer = (EventConsumer)ReflectionUtils.newInstance((Class)consumerClass, (Configuration)conf);
                this.eventConsumers.add(consumer);
            }
        } else {
            LOG.debug("No event consumers configured, notification event polling disabled");
            return;
        }
        EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(Hive.get());
        this.lastCheckedEventId = evFetcher.getCurrentNotificationEventId();
        LOG.info("Initializing lastCheckedEventId to {}", (Object)this.lastCheckedEventId);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NotificationEventPoll %d").build();
        this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
        this.pollFuture = this.executorService.scheduleAtFixedRate(new Poller(), pollInterval, pollInterval, TimeUnit.MILLISECONDS);
    }

    private void stop() {
        if (this.pollFuture != null) {
            this.pollFuture.cancel(true);
            this.pollFuture = null;
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
            this.executorService = null;
        }
    }

    class Poller
    implements Runnable {
        Poller() {
        }

        @Override
        public void run() {
            LOG.debug("Polling for notification events");
            int eventsProcessed = 0;
            try {
                EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(Hive.get());
                EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(evFetcher, NotificationEventPoll.this.lastCheckedEventId, 0, "*", null);
                while (evIter.hasNext()) {
                    NotificationEvent event = evIter.next();
                    LOG.debug("Event: " + event);
                    for (EventConsumer eventConsumer : NotificationEventPoll.this.eventConsumers) {
                        try {
                            eventConsumer.accept(event);
                        }
                        catch (Exception err) {
                            LOG.error("Error processing notification event " + event, (Throwable)err);
                        }
                    }
                    ++eventsProcessed;
                    NotificationEventPoll.this.lastCheckedEventId = event.getEventId();
                }
            }
            catch (Exception err) {
                LOG.error("Error polling for notification events", (Throwable)err);
            }
            LOG.debug("Processed {} notification events", (Object)eventsProcessed);
        }
    }
}

