package org.apache.spark.streaming.flume;

import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.HelpFormatter;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
import org.apache.flume.channel.AbstractChannel;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.MultiplexingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
import org.apache.spark.streaming.flume.sink.SparkSink;
import org.apache.spark.streaming.flume.sink.SparkSinkConfig$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;

/* compiled from: PollingFlumeTestUtils.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-c!\u0002\u000e\u001c\u0001m)\u0003\"\u0002\u0017\u0001\t\u0003q\u0003bB\u0019\u0001\u0005\u0004%IA\r\u0005\u0007m\u0001\u0001\u000b\u0011B\u001a\t\u000f]\u0002!\u0019!C\u0001e!1\u0001\b\u0001Q\u0001\nMBq!\u000f\u0001C\u0002\u0013%!\u0007\u0003\u0004;\u0001\u0001\u0006Ia\r\u0005\bw\u0001\u0011\r\u0011\"\u00033\u0011\u0019a\u0004\u0001)A\u0005g!)Q\b\u0001C\u0001e!9a\b\u0001b\u0001\n\u0013y\u0004BB(\u0001A\u0003%\u0001\tC\u0004Q\u0001\t\u0007I\u0011B)\t\re\u0003\u0001\u0015!\u0003S\u0011\u0015Q\u0006\u0001\"\u0001\\\u0011\u0015a\u0006\u0001\"\u0001^\u0011\u0015Q\u0007\u0001\"\u0001l\u0011\u0015y\u0007\u0001\"\u0001q\u0011\u0019\t\u0019\u0002\u0001C\u0001W\"9\u0011Q\u0003\u0001\u0005\n\u0005]\u0001BBA\u000e\u0001\u0011\u00051N\u0002\u0004\u0002\u001e\u0001!\u0011q\u0004\u0005\t\u0017Z\u0011\t\u0011)A\u0005\u0011\"1AF\u0006C\u0001\u0003\u007fAq!a\u0012\u0017\t\u0003\nIEA\u000bQ_2d\u0017N\\4GYVlW\rV3tiV#\u0018\u000e\\:\u000b\u0005qi\u0012!\u00024mk6,'B\u0001\u0010 \u0003%\u0019HO]3b[&twM\u0003\u0002!C\u0005)1\u000f]1sW*\u0011!eI\u0001\u0007CB\f7\r[3\u000b\u0003\u0011\n1a\u001c:h'\t\u0001a\u0005\u0005\u0002(U5\t\u0001FC\u0001*\u0003\u0015\u00198-\u00197b\u0013\tY\u0003F\u0001\u0004B]f\u0014VMZ\u0001\u0007y%t\u0017\u000e\u001e \u0004\u0001Q\tq\u0006\u0005\u00021\u00015\t1$\u0001\u0006cCR\u001c\u0007nQ8v]R,\u0012a\r\t\u0003OQJ!!\u000e\u0015\u0003\u0007%sG/A\u0006cCR\u001c\u0007nQ8v]R\u0004\u0013AD3wK:$8\u000fU3s\u0005\u0006$8\r[\u0001\u0010KZ,g\u000e^:QKJ\u0014\u0015\r^2iA\u0005)Bo\u001c;bY\u00163XM\u001c;t!\u0016\u00148\t[1o]\u0016d\u0017A\u0006;pi\u0006dWI^3oiN\u0004VM]\"iC:tW\r\u001c\u0011\u0002\u001f\rD\u0017M\u001c8fY\u000e\u000b\u0007/Y2jif\f\u0001c\u00195b]:,GnQ1qC\u000eLG/\u001f\u0011\u0002\u001d\u001d,G\u000fV8uC2,e/\u001a8ug\u0006A1\r[1o]\u0016d7/F\u0001A!\r\te\tS\u0007\u0002\u0005*\u00111\tR\u0001\b[V$\u0018M\u00197f\u0015\t)\u0005&\u0001\u0006d_2dWm\u0019;j_:L!a\u0012\"\u0003\u0017\u0005\u0013(/Y=Ck\u001a4WM\u001d\t\u0003\u00136k\u0011A\u0013\u0006\u0003\u00172\u000bqa\u00195b]:,GN\u0003\u0002\u001dC%\u0011aJ\u0013\u0002\u000e\u001b\u0016lwN]=DQ\u0006tg.\u001a7\u0002\u0013\rD\u0017M\u001c8fYN\u0004\u0013!B:j].\u001cX#\u0001*\u0011\u0007\u000535\u000b\u0005\u0002U/6\tQK\u0003\u0002W7\u0005!1/\u001b8l\u0013\tAVKA\u0005Ta\u0006\u00148nU5oW\u000611/\u001b8lg\u0002\nqb\u001d;beR\u001c\u0016N\\4mKNKgn\u001b\u000b\u0002g\u0005\u00112\u000f^1si6+H\u000e^5qY\u0016\u001c\u0016N\\6t)\u0005q\u0006cA0hg9\u0011\u0001-\u001a\b\u0003C\u0012l\u0011A\u0019\u0006\u0003G6\na\u0001\u0010:p_Rt\u0014\"A\u0015\n\u0005\u0019D\u0013a\u00029bG.\fw-Z\u0005\u0003Q&\u00141aU3r\u0015\t1\u0007&A\u0014tK:$G)\u0019;b\u0003:$WI\\:ve\u0016\fE\u000e\u001c#bi\u0006D\u0015m\u001d\"fK:\u0014VmY3jm\u0016$G#\u00017\u0011\u0005\u001dj\u0017B\u00018)\u0005\u0011)f.\u001b;\u0002\u0019\u0005\u001c8/\u001a:u\u001fV$\b/\u001e;\u0015\t1\f\u0018Q\u0002\u0005\u0006eJ\u0001\ra]\u0001\u000e_V$\b/\u001e;IK\u0006$WM]:\u0011\u0007QL80D\u0001v\u0015\t1x/\u0001\u0003vi&d'\"\u0001=\u0002\t)\fg/Y\u0005\u0003uV\u0014A\u0001T5tiB!A\u000f @\u007f\u0013\tiXOA\u0002NCB\u00042a`A\u0004\u001d\u0011\t\t!a\u0001\u0011\u0005\u0005D\u0013bAA\u0003Q\u00051\u0001K]3eK\u001aLA!!\u0003\u0002\f\t11\u000b\u001e:j]\u001eT1!!\u0002)\u0011\u001d\tyA\u0005a\u0001\u0003#\tAb\\;uaV$(i\u001c3jKN\u00042\u0001^=\u007f\u0003Y\t7o]3si\u000eC\u0017M\u001c8fYN\f%/Z#naRL\u0018\u0001F1tg\u0016\u0014Ho\u00115b]:,G.S:F[B$\u0018\u0010F\u0002m\u00033AQa\u0013\u000bA\u0002!\u000bQa\u00197pg\u0016\u0014A\u0002\u0016=o'V\u0014W.\u001b;uKJ\u001cRAFA\u0011\u0003[\u0001B!a\t\u0002*5\u0011\u0011Q\u0005\u0006\u0004\u0003O9\u0018\u0001\u00027b]\u001eLA!a\u000b\u0002&\t1qJ\u00196fGR\u0004b!a\f\u00026\u0005eRBAA\u0019\u0015\r\t\u0019$^\u0001\u000bG>t7-\u001e:sK:$\u0018\u0002BA\u001c\u0003c\u0011\u0001bQ1mY\u0006\u0014G.\u001a\t\u0005\u0003G\tY$\u0003\u0003\u0002>\u0005\u0015\"\u0001\u0002,pS\u0012$B!!\u0011\u0002FA\u0019\u00111\t\f\u000e\u0003\u0001AQa\u0013\rA\u0002!\u000bAaY1mYR\u0011\u0011\u0011\b")
/* loaded from: input_file:org/apache/spark/streaming/flume/PollingFlumeTestUtils.class */
public class PollingFlumeTestUtils {
    private final int org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount = 5;
    private final int eventsPerBatch = 100;
    private final int totalEventsPerChannel = org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount() * eventsPerBatch();
    private final int channelCapacity = SpoolDirectorySourceConfigurationConstants.DEFAULT_BUFFER_MAX_LINE_LENGTH;
    private final ArrayBuffer<MemoryChannel> channels = new ArrayBuffer<>();
    private final ArrayBuffer<SparkSink> sinks = new ArrayBuffer<>();

    /* compiled from: PollingFlumeTestUtils.scala */
    /* loaded from: input_file:org/apache/spark/streaming/flume/PollingFlumeTestUtils$TxnSubmitter.class */
    public class TxnSubmitter implements Callable<Void> {
        private final MemoryChannel channel;
        public final /* synthetic */ PollingFlumeTestUtils $outer;

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            IntRef create = IntRef.create(0);
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), org$apache$spark$streaming$flume$PollingFlumeTestUtils$TxnSubmitter$$$outer().org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount()).foreach$mVc$sp(i -> {
                Transaction transaction = this.channel.getTransaction();
                transaction.begin();
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$TxnSubmitter$$$outer().eventsPerBatch()).foreach$mVc$sp(i -> {
                    this.channel.put(EventBuilder.withBody(new StringBuilder(1).append(this.channel.getName()).append(HelpFormatter.DEFAULT_OPT_PREFIX).append(create.elem).toString().getBytes(StandardCharsets.UTF_8), (Map<String, String>) Collections.singletonMap(new StringBuilder(5).append("test-").append(create.elem).toString(), MultiplexingChannelSelector.CONFIG_MULTIPLEX_HEADER_NAME)));
                    create.elem++;
                });
                transaction.commit();
                transaction.close();
                Thread.sleep(500L);
            });
            return null;
        }

        public /* synthetic */ PollingFlumeTestUtils org$apache$spark$streaming$flume$PollingFlumeTestUtils$TxnSubmitter$$$outer() {
            return this.$outer;
        }

        public TxnSubmitter(PollingFlumeTestUtils pollingFlumeTestUtils, MemoryChannel memoryChannel) {
            this.channel = memoryChannel;
            if (pollingFlumeTestUtils == null) {
                throw null;
            }
            this.$outer = pollingFlumeTestUtils;
        }
    }

    public int org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount() {
        return this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount;
    }

    public int eventsPerBatch() {
        return this.eventsPerBatch;
    }

    private int totalEventsPerChannel() {
        return this.totalEventsPerChannel;
    }

    private int channelCapacity() {
        return this.channelCapacity;
    }

    public int getTotalEvents() {
        return totalEventsPerChannel() * channels().size();
    }

    private ArrayBuffer<MemoryChannel> channels() {
        return this.channels;
    }

    private ArrayBuffer<SparkSink> sinks() {
        return this.sinks;
    }

    public int startSingleSink() {
        channels().clear();
        sinks().clear();
        Context context = new Context();
        context.put("capacity", BoxesRunTime.boxToInteger(channelCapacity()).toString());
        context.put("transactionCapacity", "1000");
        context.put("keep-alive", "0");
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, context);
        SparkSink sparkSink = new SparkSink();
        context.put(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), "localhost");
        context.put(SparkSinkConfig$.MODULE$.CONF_PORT(), String.valueOf(0));
        Configurables.configure(sparkSink, context);
        sparkSink.setChannel(memoryChannel);
        sparkSink.start();
        channels().$plus$eq(memoryChannel);
        sinks().$plus$eq(sparkSink);
        return sparkSink.getPort();
    }

    public Seq<Object> startMultipleSinks() {
        channels().clear();
        sinks().clear();
        Context context = new Context();
        context.put("capacity", BoxesRunTime.boxToInteger(channelCapacity()).toString());
        context.put("transactionCapacity", "1000");
        context.put("keep-alive", "0");
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, context);
        MemoryChannel memoryChannel2 = new MemoryChannel();
        Configurables.configure(memoryChannel2, context);
        SparkSink sparkSink = new SparkSink();
        context.put(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), "localhost");
        context.put(SparkSinkConfig$.MODULE$.CONF_PORT(), String.valueOf(0));
        Configurables.configure(sparkSink, context);
        sparkSink.setChannel(memoryChannel);
        sparkSink.start();
        SparkSink sparkSink2 = new SparkSink();
        context.put(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), "localhost");
        context.put(SparkSinkConfig$.MODULE$.CONF_PORT(), String.valueOf(0));
        Configurables.configure(sparkSink2, context);
        sparkSink2.setChannel(memoryChannel2);
        sparkSink2.start();
        sinks().$plus$eq(sparkSink);
        sinks().$plus$eq(sparkSink2);
        channels().$plus$eq(memoryChannel);
        channels().$plus$eq(memoryChannel2);
        return (Seq) sinks().map(sparkSink3 -> {
            return BoxesRunTime.boxToInteger(sparkSink3.getPort());
        }, ArrayBuffer$.MODULE$.canBuildFrom());
    }

    public void sendDataAndEnsureAllDataHasBeenReceived() {
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newCachedThreadPool());
        CountDownLatch countDownLatch = new CountDownLatch(org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount() * channels().size());
        sinks().foreach(sparkSink -> {
            sparkSink.countdownWhenBatchReceived(countDownLatch);
            return BoxedUnit.UNIT;
        });
        channels().foreach(memoryChannel -> {
            return executorCompletionService.submit(new TxnSubmitter(this, memoryChannel));
        });
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), channels().size()).foreach(obj -> {
            BoxesRunTime.unboxToInt(obj);
            return executorCompletionService.take();
        });
        countDownLatch.await(15L, TimeUnit.SECONDS);
    }

    public void assertOutput(List<Map<String, String>> list, List<String> list2) {
        Predef$.MODULE$.require(list.size() == list2.size());
        int size = list.size();
        if (size != totalEventsPerChannel() * channels().size()) {
            throw new AssertionError(new StringBuilder(26).append("Expected ").append(totalEventsPerChannel() * channels().size()).append(" events, but was ").append(size).toString());
        }
        IntRef create = IntRef.create(0);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), channels().size()).foreach$mVc$sp(i -> {
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.totalEventsPerChannel()).foreach$mVc$sp(i -> {
                int i;
                String sb = new StringBuilder(1).append(((AbstractChannel) this.channels().apply(i)).getName()).append(HelpFormatter.DEFAULT_OPT_PREFIX).append(i).toString();
                Map singletonMap = Collections.singletonMap(new StringBuilder(5).append("test-").append(i).toString(), MultiplexingChannelSelector.CONFIG_MULTIPLEX_HEADER_NAME);
                boolean z = false;
                while (true) {
                    int i2 = i;
                    if (i2 >= size || z) {
                        return;
                    }
                    Object obj = list2.get(i2);
                    if (sb == null) {
                        i = obj != null ? i2 + 1 : 0;
                        Object obj2 = list.get(i2);
                        if (singletonMap != null) {
                            if (obj2 != null) {
                            }
                            z = true;
                            create.elem++;
                        } else {
                            if (!singletonMap.equals(obj2)) {
                            }
                            z = true;
                            create.elem++;
                        }
                    } else {
                        if (!sb.equals(obj)) {
                        }
                        Object obj22 = list.get(i2);
                        if (singletonMap != null) {
                        }
                    }
                }
            });
        });
        if (create.elem != totalEventsPerChannel() * channels().size()) {
            throw new AssertionError(new StringBuilder(30).append("111 Expected ").append(totalEventsPerChannel() * channels().size()).append(" events, but was ").append(create.elem).toString());
        }
    }

    public void assertChannelsAreEmpty() {
        channels().foreach(memoryChannel -> {
            this.assertChannelIsEmpty(memoryChannel);
            return BoxedUnit.UNIT;
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void assertChannelIsEmpty(MemoryChannel memoryChannel) {
        Field declaredField = memoryChannel.getClass().getDeclaredField("queueRemaining");
        declaredField.setAccessible(true);
        if (BoxesRunTime.unboxToInt(declaredField.get(memoryChannel).getClass().getDeclaredMethod("availablePermits", new Class[0]).invoke(declaredField.get(memoryChannel), new Object[0])) != channelCapacity()) {
            throw new AssertionError(new StringBuilder(21).append("Channel ").append(memoryChannel.getName()).append(" is not empty").toString());
        }
    }

    public void close() {
        sinks().foreach(sparkSink -> {
            sparkSink.stop();
            return BoxedUnit.UNIT;
        });
        sinks().clear();
        channels().foreach(memoryChannel -> {
            memoryChannel.stop();
            return BoxedUnit.UNIT;
        });
        channels().clear();
    }
}
