package org.apache.druid.examples.wikipedia;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.ircclouds.irc.api.Callback;
import com.ircclouds.irc.api.IRCApiImpl;
import com.ircclouds.irc.api.IServerParameters;
import com.ircclouds.irc.api.domain.IRCServer;
import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter;
import com.ircclouds.irc.api.state.IIRCState;
import java.io.File;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/druid/examples/wikipedia/IrcFirehoseFactory.class */
public class IrcFirehoseFactory implements FirehoseFactory<InputRowParser<Pair<DateTime, ChannelPrivMsg>>> {
    private static final Logger log = new Logger(IrcFirehoseFactory.class);
    private final String nick;
    private final String host;
    private final List<String> channels;
    private volatile boolean closed = false;

    @JsonCreator
    public IrcFirehoseFactory(@JsonProperty("nick") String str, @JsonProperty("host") String str2, @JsonProperty("channels") List<String> list) {
        this.nick = str;
        this.host = str2;
        this.channels = list;
    }

    @JsonProperty
    public String getNick() {
        return this.nick;
    }

    @JsonProperty
    public String getHost() {
        return this.host;
    }

    @JsonProperty
    public List<String> getChannels() {
        return this.channels;
    }

    public Firehose connect(final InputRowParser<Pair<DateTime, ChannelPrivMsg>> inputRowParser, File file) {
        final IRCApiImpl iRCApiImpl = new IRCApiImpl(false);
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        iRCApiImpl.addListener(new VariousMessageListenerAdapter() { // from class: org.apache.druid.examples.wikipedia.IrcFirehoseFactory.1
            public void onChannelMessage(ChannelPrivMsg channelPrivMsg) {
                try {
                    linkedBlockingQueue.put(Pair.of(DateTimes.nowUtc(), channelPrivMsg));
                } catch (InterruptedException e) {
                    throw new RuntimeException("interrupted adding message to queue", e);
                }
            }
        });
        log.info("connecting to irc server [%s]", new Object[]{this.host});
        iRCApiImpl.connect(new IServerParameters() { // from class: org.apache.druid.examples.wikipedia.IrcFirehoseFactory.2
            public String getNickname() {
                return IrcFirehoseFactory.this.nick;
            }

            public List<String> getAlternativeNicknames() {
                return Lists.newArrayList(new String[]{IrcFirehoseFactory.this.nick + UUID.randomUUID(), IrcFirehoseFactory.this.nick + UUID.randomUUID(), IrcFirehoseFactory.this.nick + UUID.randomUUID()});
            }

            public String getIdent() {
                return "druid";
            }

            public String getRealname() {
                return IrcFirehoseFactory.this.nick;
            }

            public IRCServer getServer() {
                return new IRCServer(IrcFirehoseFactory.this.host, false);
            }
        }, new Callback<IIRCState>() { // from class: org.apache.druid.examples.wikipedia.IrcFirehoseFactory.3
            public void onSuccess(IIRCState iIRCState) {
                IrcFirehoseFactory.log.info("irc connection to server [%s] established", new Object[]{IrcFirehoseFactory.this.host});
                for (String str : IrcFirehoseFactory.this.channels) {
                    IrcFirehoseFactory.log.info("Joining channel %s", new Object[]{str});
                    iRCApiImpl.joinChannel(str);
                }
            }

            public void onFailure(Exception exc) {
                IrcFirehoseFactory.log.error(exc, "Unable to connect to irc server [%s]", new Object[]{IrcFirehoseFactory.this.host});
                throw new RuntimeException("Unable to connect to server", exc);
            }
        });
        this.closed = false;
        return new Firehose() { // from class: org.apache.druid.examples.wikipedia.IrcFirehoseFactory.4
            InputRow nextRow = null;
            Iterator<InputRow> nextIterator = Collections.emptyIterator();

            public boolean hasMore() {
                while (!IrcFirehoseFactory.this.closed) {
                    try {
                        if (this.nextIterator.hasNext()) {
                            this.nextRow = this.nextIterator.next();
                            if (this.nextRow != null) {
                                return true;
                            }
                        } else {
                            Pair pair = (Pair) linkedBlockingQueue.poll(100L, TimeUnit.MILLISECONDS);
                            if (pair != null) {
                                try {
                                    this.nextIterator = inputRowParser.parseBatch(pair).iterator();
                                } catch (IllegalArgumentException e) {
                                    IrcFirehoseFactory.log.debug("ignoring invalid message in channel [%s]", new Object[]{((ChannelPrivMsg) pair.rhs).getChannelName()});
                                }
                            }
                        }
                    } catch (InterruptedException e2) {
                        Thread.interrupted();
                        throw new RuntimeException("interrupted retrieving elements from queue", e2);
                    }
                }
                return false;
            }

            @Nullable
            public InputRow nextRow() {
                return this.nextRow;
            }

            public Runnable commit() {
                return () -> {
                };
            }

            public void close() {
                try {
                    IrcFirehoseFactory.log.info("disconnecting from irc server [%s]", new Object[]{IrcFirehoseFactory.this.host});
                    iRCApiImpl.disconnect("");
                } finally {
                    IrcFirehoseFactory.this.closed = true;
                }
            }
        };
    }
}
