/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.streaming;

import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.QueryTest$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.physical.AllTuples$;
import org.apache.spark.sql.catalyst.plans.physical.Distribution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation;
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
import org.apache.spark.sql.execution.streaming.IncrementalExecution;
import org.apache.spark.sql.execution.streaming.MemorySink;
import org.apache.spark.sql.execution.streaming.MemorySinkBase;
import org.apache.spark.sql.execution.streaming.MicroBatchExecution;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.StatefulOperator;
import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation;
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution;
import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2;
import org.apache.spark.sql.execution.streaming.state.StateStore$;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamTest;
import org.apache.spark.sql.streaming.StreamTest$;
import org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$6$;
import org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1$;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.spark.sql.streaming.StreamingQueryManager;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.streaming.util.StreamManualClock;
import org.apache.spark.util.Clock;
import org.apache.spark.util.SystemClock;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEquals;
import org.scalactic.source.Position;
import org.scalatest.Assertions;
import org.scalatest.compatible.Assertion;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.concurrent.Signaler;
import org.scalatest.concurrent.ThreadSignaler$;
import org.scalatest.enablers.Timed$;
import org.scalatest.exceptions.TestFailedDueToTimeoutException;
import org.scalatest.time.Span$;
import org.scalatest.time.SpanSugar$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.api.TypeTags;
import scala.reflect.runtime.package$;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LongRef;
import scala.runtime.Nothing$;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.VolatileObjectRef;
import scala.util.Random$;
import scala.util.control.NonFatal$;

public abstract class StreamTest$class {
    public static void afterAll(StreamTest $this) {
        $this.org$apache$spark$sql$streaming$StreamTest$$super$afterAll();
        StateStore$.MODULE$.stop();
    }

    public static void testStream(StreamTest $this, Dataset _stream, OutputMode outputMode, boolean useV2Sink, Seq actions) {
        StreamTest streamTest = $this;
        synchronized (streamTest) {
            Seq seq;
            Dataset stream = _stream.toDF();
            SparkSession sparkSession = stream.sparkSession();
            IntRef pos = IntRef.create((int)0);
            ObjectRef currentStream = ObjectRef.create(null);
            ObjectRef lastStream = ObjectRef.create(null);
            HashMap awaiting = new HashMap();
            MemorySinkV2 sink = useV2Sink ? new MemorySinkV2() : new MemorySink(stream.schema(), outputMode);
            Map resetConfValues = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
            String x$18 = "streaming.metadata";
            String x$19 = Utils$.MODULE$.createTempDir$default$1();
            String defaultCheckpointLocation = Utils$.MODULE$.createTempDir(x$19, x$18).getCanonicalPath();
            LongRef manualClockExpectedTime = LongRef.create((long)-1L);
            VolatileObjectRef streamThreadDeathCause = VolatileObjectRef.create(null);
            StreamingQueryListener listener = new StreamingQueryListener($this, streamThreadDeathCause){
                public final VolatileObjectRef streamThreadDeathCause$1;

                public void onQueryStarted(StreamingQueryListener.QueryStartedEvent event) {
                    Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(this){
                        private final /* synthetic */ StreamTest$.anon.1 $outer;

                        public void uncaughtException(Thread t, Throwable e) {
                            this.$outer.streamThreadDeathCause$1.elem = e;
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                        }
                    });
                }

                public void onQueryProgress(StreamingQueryListener.QueryProgressEvent event) {
                }

                public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent event) {
                }
                {
                    this.streamThreadDeathCause$1 = streamThreadDeathCause$1;
                }
            };
            sparkSession.streams().addListener(listener);
            boolean startedManually = ((IterableLike)actions.takeWhile((Function1)new Serializable($this){
                public static final long serialVersionUID = 0L;

                public final boolean apply(StreamTest.StreamAction x$3) {
                    return !(x$3 instanceof StreamTest.StreamMustBeRunning);
                }
            })).exists((Function1)new Serializable($this){
                public static final long serialVersionUID = 0L;

                public final boolean apply(StreamTest.StreamAction x$4) {
                    return x$4 instanceof StreamTest.StartStream;
                }
            });
            if (startedManually) {
                seq = actions;
            } else {
                StreamTest.StartStream startStream = new StreamTest.StartStream($this, $this.StartStream().apply$default$1(), $this.StartStream().apply$default$2(), $this.StartStream().apply$default$3(), $this.StartStream().apply$default$4());
                seq = (Seq)actions.$plus$colon((Object)startStream, Seq$.MODULE$.canBuildFrom());
            }
            Seq startedTest = seq;
            LongRef lastFetchedMemorySinkLastBatchId = LongRef.create((long)-1L);
            StreamTest$class.liftedTree1$1($this, stream, sparkSession, pos, currentStream, lastStream, awaiting, (MemorySinkBase)sink, resetConfValues, defaultCheckpointLocation, manualClockExpectedTime, streamThreadDeathCause, listener, startedManually, startedTest, lastFetchedMemorySinkLastBatchId, outputMode, actions);
            return;
        }
    }

    public static OutputMode testStream$default$2(StreamTest $this) {
        return OutputMode.Append();
    }

    public static boolean testStream$default$3(StreamTest $this) {
        return $this.defaultUseV2Sink();
    }

    public static void runStressTest(StreamTest $this, Dataset ds, Function1 addData2, int iterations) {
        $this.runStressTest((Dataset<Object>)ds, (Seq<StreamTest.StreamAction>)((Seq)Seq$.MODULE$.empty()), (Function2<Seq<Object>, Object, StreamTest.StreamAction>)new Serializable($this, addData2){
            public static final long serialVersionUID = 0L;
            private final Function1 addData$1;

            public final StreamTest.StreamAction apply(Seq<Object> data, boolean running) {
                return (StreamTest.StreamAction)this.addData$1.apply(data);
            }
            {
                this.addData$1 = addData$1;
            }
        }, iterations);
    }

    public static void runStressTest(StreamTest $this, Dataset ds, Seq prepareActions, Function2 addData2, int iterations) {
        ExpressionEncoder intEncoder = ExpressionEncoder$.MODULE$.apply(((TypeTags)package$.MODULE$.universe()).TypeTag().Int());
        IntRef dataPos = IntRef.create((int)0);
        BooleanRef running = BooleanRef.create((boolean)true);
        ArrayBuffer actions = new ArrayBuffer();
        actions.$plus$plus$eq((TraversableOnce)prepareActions);
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), iterations).foreach((Function1)new Serializable($this, intEncoder, dataPos, running, actions, addData2){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamTest $outer;
            private final ExpressionEncoder intEncoder$1;
            private final IntRef dataPos$1;
            private final BooleanRef running$1;
            private final ArrayBuffer actions$1;
            private final Function2 addData$2;

            public final Object apply(int i) {
                ArrayBuffer arrayBuffer;
                double rand = Random$.MODULE$.nextDouble();
                if (this.running$1.elem) {
                    ArrayBuffer arrayBuffer2;
                    double d = rand;
                    if (d < 0.1) {
                        arrayBuffer2 = StreamTest$class.addCheck$1(this.$outer, this.intEncoder$1, this.dataPos$1, this.actions$1);
                    } else if (d < 0.7) {
                        arrayBuffer2 = StreamTest$class.addRandomData$1(this.$outer, this.dataPos$1, this.running$1, this.actions$1, this.addData$2);
                    } else {
                        StreamTest$class.addCheck$1(this.$outer, this.intEncoder$1, this.dataPos$1, this.actions$1);
                        this.actions$1.$plus$eq((Object)this.$outer.StopStream());
                        this.running$1.elem = false;
                        arrayBuffer2 = BoxedUnit.UNIT;
                    }
                    arrayBuffer = arrayBuffer2;
                } else {
                    ArrayBuffer arrayBuffer3;
                    double d = rand;
                    if (d < 0.7) {
                        arrayBuffer3 = StreamTest$class.addRandomData$1(this.$outer, this.dataPos$1, this.running$1, this.actions$1, this.addData$2);
                    } else {
                        this.actions$1.$plus$eq((Object)new StreamTest.StartStream(this.$outer, this.$outer.StartStream().apply$default$1(), this.$outer.StartStream().apply$default$2(), this.$outer.StartStream().apply$default$3(), this.$outer.StartStream().apply$default$4()));
                        this.running$1.elem = true;
                        arrayBuffer3 = BoxedUnit.UNIT;
                    }
                    arrayBuffer = arrayBuffer3;
                }
                return arrayBuffer;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.intEncoder$1 = intEncoder$1;
                this.dataPos$1 = dataPos$1;
                this.running$1 = running$1;
                this.actions$1 = actions$1;
                this.addData$2 = addData$2;
            }
        });
        Object object = running.elem ? BoxedUnit.UNIT : actions.$plus$eq((Object)new StreamTest.StartStream($this, $this.StartStream().apply$default$1(), $this.StartStream().apply$default$2(), $this.StartStream().apply$default$3(), $this.StartStream().apply$default$4()));
        StreamTest$class.addCheck$1($this, intEncoder, dataPos, actions);
        $this.testStream(ds, $this.testStream$default$2(), $this.testStream$default$3(), (Seq<StreamTest.StreamAction>)actions);
    }

    public static int runStressTest$default$3(StreamTest $this) {
        return 100;
    }

    private static final String testActions$1(StreamTest $this, IntRef pos$1, boolean startedManually$1, Seq actions$2) {
        return ((TraversableOnce)((TraversableLike)actions$2.zipWithIndex(Seq$.MODULE$.canBuildFrom())).map((Function1)new Serializable($this, pos$1, startedManually$1){
            public static final long serialVersionUID = 0L;
            private final IntRef pos$1;
            private final boolean startedManually$1;

            public final String apply(Tuple2<StreamTest.StreamAction, Object> x0$3) {
                Tuple2<StreamTest.StreamAction, Object> tuple2 = x0$3;
                if (tuple2 != null) {
                    StreamTest.StreamAction a = (StreamTest.StreamAction)tuple2._1();
                    int i = tuple2._2$mcI$sp();
                    String string = this.pos$1.elem == i && this.startedManually$1 || this.pos$1.elem == i + 1 && !this.startedManually$1 ? new StringBuilder().append((Object)"=> ").append((Object)a.toString()).toString() : new StringBuilder().append((Object)"   ").append((Object)a.toString()).toString();
                    return string;
                }
                throw new MatchError(tuple2);
            }
            {
                this.pos$1 = pos$1;
                this.startedManually$1 = startedManually$1;
            }
        }, Seq$.MODULE$.canBuildFrom())).mkString("\n");
    }

    private static final String currentOffsets$1(StreamTest $this, ObjectRef currentStream$1) {
        return (StreamExecution)currentStream$1.elem == null ? "not started" : ((StreamExecution)currentStream$1.elem).committedOffsets().toString();
    }

    private static final String threadState$1(StreamTest $this, ObjectRef currentStream$1) {
        return (StreamExecution)currentStream$1.elem != null && ((StreamExecution)currentStream$1.elem).queryExecutionThread().isAlive() ? "alive" : "dead";
    }

    private static final String threadStackTrace$1(StreamTest $this, ObjectRef currentStream$1) {
        return (StreamExecution)currentStream$1.elem != null && ((StreamExecution)currentStream$1.elem).queryExecutionThread().isAlive() ? new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Thread stack trace: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.refArrayOps((Object[])((StreamExecution)currentStream$1.elem).queryExecutionThread().getStackTrace()).mkString("\n")})) : "";
    }

    private static final String testState$1(StreamTest $this, IntRef pos$1, ObjectRef currentStream$1, MemorySinkBase sink$1, VolatileObjectRef streamThreadDeathCause$1, boolean startedManually$1, OutputMode outputMode$1, Seq actions$2) {
        MemorySinkBase memorySinkBase;
        block4: {
            String string;
            block3: {
                block2: {
                    memorySinkBase = sink$1;
                    if (!(memorySinkBase instanceof MemorySink)) break block2;
                    MemorySink memorySink = (MemorySink)memorySinkBase;
                    string = memorySink.toDebugString();
                    break block3;
                }
                if (!(memorySinkBase instanceof MemorySinkV2)) break block4;
                MemorySinkV2 memorySinkV2 = (MemorySinkV2)memorySinkBase;
                string = memorySinkV2.toDebugString();
            }
            String sinkDebugString = string;
            return new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |== Progress ==\n         |", "\n         |\n         |== Stream ==\n         |Output Mode: ", "\n         |Stream state: ", "\n         |Thread state: ", "\n         |", "\n         |", "\n         |\n         |== Sink ==\n         |", "\n         |\n         |\n         |== Plan ==\n         |", "\n         "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{StreamTest$class.testActions$1($this, pos$1, startedManually$1, actions$2), outputMode$1, StreamTest$class.currentOffsets$1($this, currentStream$1), StreamTest$class.threadState$1($this, currentStream$1), StreamTest$class.threadStackTrace$1($this, currentStream$1), (Throwable)streamThreadDeathCause$1.elem == null ? "" : org.apache.spark.sql.catalyst.util.package$.MODULE$.stackTraceToString((Throwable)streamThreadDeathCause$1.elem), sinkDebugString, (StreamExecution)currentStream$1.elem == null ? "" : ((StreamExecution)currentStream$1.elem).lastExecution()})))).stripMargin();
        }
        throw new MatchError((Object)memorySinkBase);
    }

    public static final void verify$1(StreamTest $this, Function0 condition, String message, IntRef pos$1, ObjectRef currentStream$1, MemorySinkBase sink$1, VolatileObjectRef streamThreadDeathCause$1, boolean startedManually$1, OutputMode outputMode$1, Seq actions$2) {
        if (condition.apply$mcZ$sp()) {
            return;
        }
        throw StreamTest$class.failTest$1($this, message, StreamTest$class.failTest$default$2$1($this), pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
    }

    public static final Object eventually$1(StreamTest $this, String message, Function0 func, IntRef pos$1, ObjectRef currentStream$1, MemorySinkBase sink$1, VolatileObjectRef streamThreadDeathCause$1, boolean startedManually$1, OutputMode outputMode$1, Seq actions$2) {
        try {
            return Eventually$.MODULE$.eventually(new PatienceConfiguration.Timeout($this.streamingTimeout()), func, Eventually$.MODULE$.patienceConfig(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 435));
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            throw StreamTest$class.failTest$1($this, message, e, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
        }
    }

    public static final String exceptionToString$1(StreamTest $this, Throwable e, String prefix) {
        String base = new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prefix, e.getMessage()}))).append((Object)Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])e.getStackTrace()).take(10)).mkString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\\n", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prefix})), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\\n", "\\t"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prefix})), "\n")).toString();
        return e.getCause() == null ? base : new StringBuilder().append((Object)base).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\\n", "\\tCaused by: "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prefix}))).append((Object)StreamTest$class.exceptionToString$1($this, e.getCause(), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "\\t"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prefix})))).toString();
    }

    public static final String exceptionToString$default$2$1(StreamTest $this) {
        return "";
    }

    public static final Nothing$ failTest$1(StreamTest $this, String message, Throwable cause, IntRef pos$1, ObjectRef currentStream$1, MemorySinkBase sink$1, VolatileObjectRef streamThreadDeathCause$1, boolean startedManually$1, OutputMode outputMode$1, Seq actions$2) {
        Option c = Option$.MODULE$.apply((Object)cause).map((Function1)new Serializable($this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamTest $outer;

            public final String apply(Throwable x$6) {
                return StreamTest$class.exceptionToString$1(this.$outer, x$6, StreamTest$class.exceptionToString$default$2$1(this.$outer));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        None$ m = message != null && new StringOps(Predef$.MODULE$.augmentString(message)).size() > 0 ? new Some((Object)message) : None$.MODULE$;
        return ((Assertions)$this).fail(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n           |", "\n           |", "\n         "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{((TraversableOnce)Option$.MODULE$.option2Iterable((Option)m).$plus$plus((GenTraversableOnce)Option$.MODULE$.option2Iterable(c), Iterable$.MODULE$.canBuildFrom())).mkString(": "), StreamTest$class.testState$1($this, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2)})))).stripMargin(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 458));
    }

    public static final Throwable failTest$default$2$1(StreamTest $this) {
        return null;
    }

    private static final Seq fetchStreamAnswer$1(StreamTest $this, StreamExecution currentStream, boolean lastOnly, boolean sinceLastFetchOnly, IntRef pos$1, ObjectRef currentStream$1, HashMap awaiting$1, MemorySinkBase sink$1, VolatileObjectRef streamThreadDeathCause$1, boolean startedManually$1, LongRef lastFetchedMemorySinkLastBatchId$1, OutputMode outputMode$1, Seq actions$2) {
        Seq seq;
        block5: {
            StreamTest$class.verify$1($this, (Function0)new Serializable($this, lastOnly, sinceLastFetchOnly){
                public static final long serialVersionUID = 0L;
                private final boolean lastOnly$1;
                private final boolean sinceLastFetchOnly$1;

                public final boolean apply() {
                    return this.apply$mcZ$sp();
                }

                public boolean apply$mcZ$sp() {
                    return !this.lastOnly$1 || !this.sinceLastFetchOnly$1;
                }
                {
                    this.lastOnly$1 = lastOnly$1;
                    this.sinceLastFetchOnly$1 = sinceLastFetchOnly$1;
                }
            }, "both lastOnly and sinceLastFetchOnly cannot be true", pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
            StreamTest$class.verify$1($this, (Function0)new Serializable($this, currentStream){
                public static final long serialVersionUID = 0L;
                private final StreamExecution currentStream$2;

                public final boolean apply() {
                    return this.apply$mcZ$sp();
                }

                public boolean apply$mcZ$sp() {
                    return this.currentStream$2 != null;
                }
                {
                    this.currentStream$2 = currentStream$2;
                }
            }, "stream not running", pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
            awaiting$1.foreach((Function1)new Serializable($this, currentStream){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ StreamTest $outer;
                public final StreamExecution currentStream$2;

                public final void apply(Tuple2<Object, Offset> x0$4) {
                    Tuple2<Object, Offset> tuple2 = x0$4;
                    if (tuple2 != null) {
                        int sourceIndex = tuple2._1$mcI$sp();
                        Offset offset = (Offset)tuple2._2();
                        BoxedUnit boxedUnit = (BoxedUnit)this.$outer.failAfter(this.$outer.streamingTimeout(), (Function0)new Serializable(this, sourceIndex, offset){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ StreamTest$.anonfun.fetchStreamAnswer.1.5 $outer;
                            private final int sourceIndex$1;
                            private final Offset offset$1;

                            public final void apply() {
                                this.apply$mcV$sp();
                            }

                            public void apply$mcV$sp() {
                                this.$outer.currentStream$2.awaitOffset(this.sourceIndex$1, this.offset$1, Span$.MODULE$.convertSpanToDuration(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer().streamingTimeout()).toMillis());
                                if (!(this.$outer.currentStream$2.triggerClock() instanceof StreamManualClock)) {
                                    this.$outer.currentStream$2.processAllAvailable();
                                }
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                                this.sourceIndex$1 = sourceIndex$1;
                                this.offset$1 = offset$1;
                            }
                        }, this.$outer.defaultSignaler(), Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 477), Timed$.MODULE$.timed());
                        return;
                    }
                    throw new MatchError(tuple2);
                }

                public /* synthetic */ StreamTest org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer() {
                    return this.$outer;
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.currentStream$2 = currentStream$2;
                }
            });
            IncrementalExecution lastExecution = currentStream.lastExecution();
            if (currentStream instanceof MicroBatchExecution && lastExecution != null) {
                lastExecution.executedPlan().collect((PartialFunction)new Serializable($this){
                    public static final long serialVersionUID = 0L;

                    public final <A1 extends SparkPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                        Object object;
                        A1 A1 = x1;
                        if (A1 instanceof StatefulOperator) {
                            StatefulOperator statefulOperator = (StatefulOperator)A1;
                            object = statefulOperator;
                        } else {
                            object = function1.apply(x1);
                        }
                        return (B1)object;
                    }

                    public final boolean isDefinedAt(SparkPlan x1) {
                        SparkPlan sparkPlan2 = x1;
                        boolean bl = sparkPlan2 instanceof StatefulOperator;
                        return bl;
                    }
                }).foreach((Function1)new Serializable($this, lastExecution){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ StreamTest $outer;
                    private final IncrementalExecution lastExecution$1;

                    public final void apply(StatefulOperator s) {
                        Option $org_scalatest_assert_macro_left = s.stateInfo().map((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final int apply(StatefulOperatorStateInfo x$7) {
                                return x$7.numPartitions();
                            }
                        });
                        int $org_scalatest_assert_macro_right = this.lastExecution$1.numStateStores();
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "contains", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.contains((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right)), Prettifier$.MODULE$.default());
                        ((Assertions)this.$outer).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 491));
                        ((SparkPlan)s).requiredChildDistribution().foreach((Function1)new Serializable(this, s){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ StreamTest$.anonfun.fetchStreamAnswer.1.6 $outer;
                            public final StatefulOperator s$1;

                            public final Object apply(Distribution d) {
                                return ((Assertions)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).withClue((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", " specifies incorrect # partitions in requiredChildDistribution ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.s$1, d})), (Function0)new Serializable(this, d){
                                    public static final long serialVersionUID = 0L;
                                    private final /* synthetic */ StreamTest$$anonfun$fetchStreamAnswer$1$6$.anonfun.apply.14 $outer;
                                    private final Distribution d$1;

                                    public final Object apply() {
                                        BoxedUnit boxedUnit;
                                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(this.d$1.requiredNumPartitions().isDefined(), "d.requiredNumPartitions.isDefined", Prettifier$.MODULE$.default());
                                        ((Assertions)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 494));
                                        int $org_scalatest_assert_macro_left = BoxesRunTime.unboxToInt((Object)this.d$1.requiredNumPartitions().get());
                                        int $org_scalatest_assert_macro_right = 1;
                                        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                                        ((Assertions)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 495));
                                        Distribution distribution = this.d$1;
                                        AllTuples$ allTuples$ = AllTuples$.MODULE$;
                                        if (!(distribution != null ? !distribution.equals(allTuples$) : allTuples$ != null)) {
                                            boxedUnit = BoxedUnit.UNIT;
                                        } else {
                                            int $org_scalatest_assert_macro_left2 = BoxesRunTime.unboxToInt((Object)this.d$1.requiredNumPartitions().get());
                                            int $org_scalatest_assert_macro_right2 = ((StatefulOperatorStateInfo)this.$outer.s$1.stateInfo().get()).numPartitions();
                                            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left2), "==", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2 == $org_scalatest_assert_macro_right2, Prettifier$.MODULE$.default());
                                            boxedUnit = ((Assertions)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 497));
                                        }
                                        return boxedUnit;
                                    }
                                    {
                                        if ($outer == null) {
                                            throw null;
                                        }
                                        this.$outer = $outer;
                                        this.d$1 = d$1;
                                    }
                                });
                            }

                            public /* synthetic */ StreamTest$.anonfun.fetchStreamAnswer.1.6 org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer() {
                                return this.$outer;
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                                this.s$1 = s$1;
                            }
                        });
                    }

                    public /* synthetic */ StreamTest org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.lastExecution$1 = lastExecution$1;
                    }
                });
            }
            try {
                if (sinceLastFetchOnly) {
                    if (BoxesRunTime.unboxToLong((Object)sink$1.latestBatchId().getOrElse((Function0)new Serializable($this){
                        public static final long serialVersionUID = 0L;

                        public final long apply() {
                            return this.apply$mcJ$sp();
                        }

                        public long apply$mcJ$sp() {
                            return -1L;
                        }
                    })) < lastFetchedMemorySinkLastBatchId$1.elem) {
                        throw StreamTest$class.failTest$1($this, "MemorySink was probably cleared since last fetch. Use CheckAnswer instead.", StreamTest$class.failTest$default$2$1($this), pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                    }
                    seq = sink$1.dataSinceBatch(lastFetchedMemorySinkLastBatchId$1.elem);
                    break block5;
                }
                seq = lastOnly ? sink$1.latestBatchData() : sink$1.allData();
            }
            catch (Exception exception) {
                throw StreamTest$class.failTest$1($this, "Exception while getting data from sink", exception, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
            }
        }
        Seq rows = seq;
        lastFetchedMemorySinkLastBatchId$1.elem = BoxesRunTime.unboxToLong((Object)sink$1.latestBatchId().getOrElse((Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final long apply() {
                return this.apply$mcJ$sp();
            }

            public long apply$mcJ$sp() {
                return -1L;
            }
        }));
        return rows;
    }

    private static final boolean fetchStreamAnswer$default$2$1(StreamTest $this) {
        return false;
    }

    private static final boolean fetchStreamAnswer$default$3$1(StreamTest $this) {
        return false;
    }

    public static final Option findSourceIndex$1(StreamTest $this, LogicalPlan plan, BaseStreamingSource source$1) {
        return ((IterableLike)plan.collect((PartialFunction)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x2;
                if (A1 instanceof StreamingExecutionRelation) {
                    StreamingExecutionRelation streamingExecutionRelation = (StreamingExecutionRelation)A1;
                    object = streamingExecutionRelation.source();
                } else if (A1 instanceof StreamingDataSourceV2Relation) {
                    StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                    object = streamingDataSourceV2Relation.reader();
                } else {
                    object = function1.apply(x2);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(LogicalPlan x2) {
                LogicalPlan logicalPlan = x2;
                boolean bl = logicalPlan instanceof StreamingExecutionRelation ? true : logicalPlan instanceof StreamingDataSourceV2Relation;
                return bl;
            }
        }).zipWithIndex(Seq$.MODULE$.canBuildFrom())).find((Function1)new Serializable($this, source$1){
            public static final long serialVersionUID = 0L;
            private final BaseStreamingSource source$1;

            public final boolean apply(Tuple2<Object, Object> x$10) {
                Object object = x$10._1();
                BaseStreamingSource baseStreamingSource = this.source$1;
                return !(object != null ? !object.equals(baseStreamingSource) : baseStreamingSource != null);
            }
            {
                this.source$1 = source$1;
            }
        }).map((Function1)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final int apply(Tuple2<Object, Object> x$11) {
                return x$11._2$mcI$sp();
            }
        });
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static final void executeAction$1(StreamTest $this, StreamTest.StreamAction action, Dataset stream$1, SparkSession sparkSession$1, IntRef pos$1, ObjectRef currentStream$1, ObjectRef lastStream$1, HashMap awaiting$1, MemorySinkBase sink$1, Map resetConfValues$1, String defaultCheckpointLocation$1, LongRef manualClockExpectedTime$1, VolatileObjectRef streamThreadDeathCause$1, boolean startedManually$1, LongRef lastFetchedMemorySinkLastBatchId$1, OutputMode outputMode$1, Seq actions$2) {
        BoxedUnit boxedUnit;
        block41: {
            BoxedUnit boxedUnit2;
            block40: {
                BoxedUnit boxedUnit3;
                block43: {
                    Seq sparkAnswer;
                    Function1<Seq<Row>, BoxedUnit> globalCheckFunction;
                    block42: {
                        StreamTest.AssertOnQuery assertOnQuery;
                        ((Logging)$this).logInfo((Function0)new Serializable($this, action){
                            public static final long serialVersionUID = 0L;
                            private final StreamTest.StreamAction action$1;

                            public final String apply() {
                                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Processing test stream action: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.action$1}));
                            }
                            {
                                this.action$1 = action$1;
                            }
                        });
                        StreamTest.StreamAction streamAction = action;
                        if (streamAction instanceof StreamTest.StartStream) {
                            StreamTest.StartStream startStream = (StreamTest.StartStream)streamAction;
                            Trigger trigger = startStream.trigger();
                            Clock triggerClock = startStream.triggerClock();
                            scala.collection.immutable.Map<String, String> additionalConfs = startStream.additionalConfs();
                            String checkpointLocation = startStream.checkpointLocation();
                            StreamTest$class.verify$1($this, (Function0)new Serializable($this, currentStream$1){
                                public static final long serialVersionUID = 0L;
                                private final ObjectRef currentStream$1;

                                public final boolean apply() {
                                    return this.apply$mcZ$sp();
                                }

                                public boolean apply$mcZ$sp() {
                                    return (StreamExecution)this.currentStream$1.elem == null || !((StreamExecution)this.currentStream$1.elem).isActive();
                                }
                                {
                                    this.currentStream$1 = currentStream$1;
                                }
                            }, "stream already running", pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            StreamTest$class.verify$1($this, (Function0)new Serializable($this, triggerClock){
                                public static final long serialVersionUID = 0L;
                                private final Clock triggerClock$1;

                                public final boolean apply() {
                                    return this.apply$mcZ$sp();
                                }

                                public boolean apply$mcZ$sp() {
                                    return this.triggerClock$1 instanceof SystemClock || this.triggerClock$1 instanceof StreamManualClock;
                                }
                                {
                                    this.triggerClock$1 = triggerClock$1;
                                }
                            }, "Use either SystemClock or StreamManualClock to start the stream", pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            if (triggerClock instanceof StreamManualClock) {
                                manualClockExpectedTime$1.elem = ((StreamManualClock)triggerClock).getTimeMillis();
                            }
                            String metadataRoot = (String)Option$.MODULE$.apply((Object)checkpointLocation).getOrElse((Function0)new Serializable($this, defaultCheckpointLocation$1){
                                public static final long serialVersionUID = 0L;
                                private final String defaultCheckpointLocation$1;

                                public final String apply() {
                                    return this.defaultCheckpointLocation$1;
                                }
                                {
                                    this.defaultCheckpointLocation$1 = defaultCheckpointLocation$1;
                                }
                            });
                            additionalConfs.foreach((Function1)new Serializable($this, sparkSession$1, resetConfValues$1){
                                public static final long serialVersionUID = 0L;
                                private final SparkSession sparkSession$1;
                                private final Map resetConfValues$1;

                                public final void apply(Tuple2<String, String> pair) {
                                    None$ value = this.sparkSession$1.conf().contains((String)pair._1()) ? new Some((Object)this.sparkSession$1.conf().get((String)pair._1())) : None$.MODULE$;
                                    this.resetConfValues$1.update(pair._1(), (Object)value);
                                    this.sparkSession$1.conf().set((String)pair._1(), (String)pair._2());
                                }
                                {
                                    this.sparkSession$1 = sparkSession$1;
                                    this.resetConfValues$1 = resetConfValues$1;
                                }
                            });
                            lastStream$1.elem = (StreamExecution)currentStream$1.elem;
                            StreamingQueryManager qual$4 = sparkSession$1.streams();
                            None$ x$20 = None$.MODULE$;
                            Some x$21 = new Some((Object)metadataRoot);
                            Dataset x$22 = stream$1;
                            scala.collection.immutable.Map x$23 = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Nil$.MODULE$);
                            MemorySinkBase x$24 = sink$1;
                            OutputMode x$25 = outputMode$1;
                            Trigger x$26 = trigger;
                            Clock x$27 = triggerClock;
                            boolean x$28 = qual$4.startQuery$default$7();
                            boolean x$29 = qual$4.startQuery$default$8();
                            currentStream$1.elem = ((StreamingQueryWrapper)qual$4.startQuery((Option)x$20, (Option)x$21, x$22, x$23, (BaseStreamingSink)x$24, x$25, x$28, x$29, x$26, x$27)).streamingQuery();
                            ((StreamExecution)currentStream$1.elem).awaitInitialization(Span$.MODULE$.convertSpanToDuration($this.streamingTimeout()).toMillis());
                            StreamExecution streamExecution = (StreamExecution)currentStream$1.elem;
                            if (streamExecution instanceof ContinuousExecution) {
                                ContinuousExecution continuousExecution = (ContinuousExecution)streamExecution;
                                StreamTest$class.eventually$1($this, "IncrementalExecution was not created", (Function0)new Serializable($this, continuousExecution){
                                    public static final long serialVersionUID = 0L;
                                    private final /* synthetic */ StreamTest $outer;
                                    private final ContinuousExecution x2$1;

                                    public final Assertion apply() {
                                        IncrementalExecution $org_scalatest_assert_macro_left = this.x2$1.lastExecution();
                                        Object $org_scalatest_assert_macro_right = null;
                                        IncrementalExecution incrementalExecution = $org_scalatest_assert_macro_left;
                                        Object var4_3 = null;
                                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "!=", null, incrementalExecution != null ? !incrementalExecution.equals(var4_3) : var4_3 != null, Prettifier$.MODULE$.default());
                                        return ((Assertions)this.$outer).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 564));
                                    }
                                    {
                                        if ($outer == null) {
                                            throw null;
                                        }
                                        this.$outer = $outer;
                                        this.x2$1 = x2$1;
                                    }
                                }, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                            } else {
                                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                            }
                            boxedUnit3 = BoxedUnit.UNIT;
                        }
                        if (streamAction instanceof StreamTest.AdvanceManualClock) {
                            StreamTest.AdvanceManualClock advanceManualClock = (StreamTest.AdvanceManualClock)streamAction;
                            long timeToAdd = advanceManualClock.timeToAdd();
                            StreamTest$class.verify$1($this, (Function0)new Serializable($this, currentStream$1){
                                public static final long serialVersionUID = 0L;
                                private final ObjectRef currentStream$1;

                                public final boolean apply() {
                                    return this.apply$mcZ$sp();
                                }

                                public boolean apply$mcZ$sp() {
                                    return (StreamExecution)this.currentStream$1.elem != null;
                                }
                                {
                                    this.currentStream$1 = currentStream$1;
                                }
                            }, "can not advance manual clock when a stream is not running", pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            StreamTest$class.verify$1($this, (Function0)new Serializable($this, currentStream$1){
                                public static final long serialVersionUID = 0L;
                                private final ObjectRef currentStream$1;

                                public final boolean apply() {
                                    return this.apply$mcZ$sp();
                                }

                                public boolean apply$mcZ$sp() {
                                    return ((StreamExecution)this.currentStream$1.elem).triggerClock() instanceof StreamManualClock;
                                }
                                {
                                    this.currentStream$1 = currentStream$1;
                                }
                            }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"can not advance clock of type ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{((StreamExecution)currentStream$1.elem).triggerClock().getClass()})), pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            StreamManualClock clock = (StreamManualClock)((StreamExecution)currentStream$1.elem).triggerClock();
                            long $org_scalatest_assert_macro_left = manualClockExpectedTime$1.elem;
                            int $org_scalatest_assert_macro_right = 0;
                            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= (long)$org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                            ((Assertions)$this).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 579));
                            StreamTest$class.eventually$1($this, "StreamManualClock has not yet entered the waiting state", (Function0)new Serializable($this, manualClockExpectedTime$1, clock){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ StreamTest $outer;
                                private final LongRef manualClockExpectedTime$1;
                                private final StreamManualClock clock$1;

                                public final Assertion apply() {
                                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(this.clock$1.isStreamWaitingAt(this.manualClockExpectedTime$1.elem), "clock.isStreamWaitingAt(manualClockExpectedTime)", Prettifier$.MODULE$.default());
                                    return ((Assertions)this.$outer).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 583));
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.manualClockExpectedTime$1 = manualClockExpectedTime$1;
                                    this.clock$1 = clock$1;
                                }
                            }, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            clock.advance(timeToAdd);
                            manualClockExpectedTime$1.elem += timeToAdd;
                            StreamTest$class.verify$1($this, (Function0)new Serializable($this, manualClockExpectedTime$1, clock){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ StreamTest $outer;
                                private final LongRef manualClockExpectedTime$1;
                                private final StreamManualClock clock$1;

                                public final boolean apply() {
                                    return this.apply$mcZ$sp();
                                }

                                public boolean apply$mcZ$sp() {
                                    return ((TripleEquals)this.$outer).convertToEqualizer((Object)BoxesRunTime.boxToLong((long)this.clock$1.getTimeMillis())).$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)this.manualClockExpectedTime$1.elem), Equality$.MODULE$.default());
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.manualClockExpectedTime$1 = manualClockExpectedTime$1;
                                    this.clock$1 = clock$1;
                                }
                            }, new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unexpected clock time after updating: "})).s((Seq)Nil$.MODULE$)).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"expecting ", ", current ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)manualClockExpectedTime$1.elem), BoxesRunTime.boxToLong((long)clock.getTimeMillis())}))).toString(), pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
                            return;
                        }
                        if ($this.StopStream().equals(streamAction)) {
                            StreamTest$class.verify$1($this, (Function0)new Serializable($this, currentStream$1){
                                public static final long serialVersionUID = 0L;
                                private final ObjectRef currentStream$1;

                                public final boolean apply() {
                                    return this.apply$mcZ$sp();
                                }

                                public boolean apply$mcZ$sp() {
                                    return (StreamExecution)this.currentStream$1.elem != null;
                                }
                                {
                                    this.currentStream$1 = currentStream$1;
                                }
                            }, "can not stop a stream that is not running", pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            boxedUnit2 = (BoxedUnit)$this.failAfter($this.streamingTimeout(), (Function0)new Serializable($this, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ StreamTest $outer;
                                private final IntRef pos$1;
                                public final ObjectRef currentStream$1;
                                private final MemorySinkBase sink$1;
                                private final VolatileObjectRef streamThreadDeathCause$1;
                                private final boolean startedManually$1;
                                private final OutputMode outputMode$1;
                                private final Seq actions$2;

                                public final void apply() {
                                    this.apply$mcV$sp();
                                }

                                public void apply$mcV$sp() {
                                    ((StreamExecution)this.currentStream$1.elem).stop();
                                    StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                        public static final long serialVersionUID = 0L;
                                        private final /* synthetic */ StreamTest$.anonfun.executeAction.1.7 $outer;

                                        public final boolean apply() {
                                            return this.apply$mcZ$sp();
                                        }

                                        public boolean apply$mcZ$sp() {
                                            return !((StreamExecution)this.$outer.currentStream$1.elem).queryExecutionThread().isAlive();
                                        }
                                        {
                                            if ($outer == null) {
                                                throw null;
                                            }
                                            this.$outer = $outer;
                                        }
                                    }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"microbatch thread not stopped"})).s((Seq)Nil$.MODULE$), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                    StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                        public static final long serialVersionUID = 0L;
                                        private final /* synthetic */ StreamTest$.anonfun.executeAction.1.7 $outer;

                                        public final boolean apply() {
                                            return this.apply$mcZ$sp();
                                        }

                                        public boolean apply$mcZ$sp() {
                                            return !((StreamExecution)this.$outer.currentStream$1.elem).isActive();
                                        }
                                        {
                                            if ($outer == null) {
                                                throw null;
                                            }
                                            this.$outer = $outer;
                                        }
                                    }, "query.isActive() is false even after stopping", this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                    StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                        public static final long serialVersionUID = 0L;
                                        private final /* synthetic */ StreamTest$.anonfun.executeAction.1.7 $outer;

                                        public final boolean apply() {
                                            return this.apply$mcZ$sp();
                                        }

                                        public boolean apply$mcZ$sp() {
                                            return ((StreamExecution)this.$outer.currentStream$1.elem).exception().isEmpty();
                                        }
                                        {
                                            if ($outer == null) {
                                                throw null;
                                            }
                                            this.$outer = $outer;
                                        }
                                    }, new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"query.exception() is not empty after clean stop: "})).s((Seq)Nil$.MODULE$)).append(((StreamExecution)this.currentStream$1.elem).exception().map((Function1)new Serializable(this){
                                        public static final long serialVersionUID = 0L;

                                        public final String apply(StreamingQueryException x$8) {
                                            return x$8.toString();
                                        }
                                    }).getOrElse((Function0)new Serializable(this){
                                        public static final long serialVersionUID = 0L;

                                        public final String apply() {
                                            return "";
                                        }
                                    })).toString(), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.pos$1 = pos$1;
                                    this.currentStream$1 = currentStream$1;
                                    this.sink$1 = sink$1;
                                    this.streamThreadDeathCause$1 = streamThreadDeathCause$1;
                                    this.startedManually$1 = startedManually$1;
                                    this.outputMode$1 = outputMode$1;
                                    this.actions$2 = actions$2;
                                }
                            }, $this.defaultSignaler(), Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 594), Timed$.MODULE$.timed());
                            break block40;
                        }
                        if (streamAction instanceof StreamTest.ExpectFailure) {
                            StreamTest.ExpectFailure expectFailure = (StreamTest.ExpectFailure)streamAction;
                            StreamTest$class.verify$1($this, (Function0)new Serializable($this, currentStream$1){
                                public static final long serialVersionUID = 0L;
                                private final ObjectRef currentStream$1;

                                public final boolean apply() {
                                    return this.apply$mcZ$sp();
                                }

                                public boolean apply$mcZ$sp() {
                                    return (StreamExecution)this.currentStream$1.elem != null;
                                }
                                {
                                    this.currentStream$1 = currentStream$1;
                                }
                            }, "can not expect failure when stream is not running", pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            boxedUnit = (BoxedUnit)$this.failAfter($this.streamingTimeout(), (Function0)new Serializable($this, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, expectFailure, outputMode$1, actions$2){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ StreamTest $outer;
                                private final IntRef pos$1;
                                public final ObjectRef currentStream$1;
                                private final MemorySinkBase sink$1;
                                public final VolatileObjectRef streamThreadDeathCause$1;
                                private final boolean startedManually$1;
                                public final StreamTest.ExpectFailure x9$1;
                                private final OutputMode outputMode$1;
                                private final Seq actions$2;

                                public final void apply() {
                                    this.apply$mcV$sp();
                                }

                                public void apply$mcV$sp() {
                                    StreamingQueryException thrownException = (StreamingQueryException)((Object)((Assertions)this.$outer).intercept((Function0)new Serializable(this){
                                        public static final long serialVersionUID = 0L;
                                        private final /* synthetic */ StreamTest$.anonfun.executeAction.1.9 $outer;

                                        public final void apply() {
                                            this.apply$mcV$sp();
                                        }

                                        public void apply$mcV$sp() {
                                            ((StreamExecution)this.$outer.currentStream$1.elem).awaitTermination();
                                        }
                                        {
                                            if ($outer == null) {
                                                throw null;
                                            }
                                            this.$outer = $outer;
                                        }
                                    }, ClassTag$.MODULE$.apply(StreamingQueryException.class), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 618)));
                                    StreamTest$class.eventually$1(this.$outer, "microbatch thread not stopped after termination with failure", (Function0)new Serializable(this){
                                        public static final long serialVersionUID = 0L;
                                        private final /* synthetic */ StreamTest$.anonfun.executeAction.1.9 $outer;

                                        public final Assertion apply() {
                                            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(((StreamExecution)this.$outer.currentStream$1.elem).queryExecutionThread().isAlive(), "currentStream.queryExecutionThread.isAlive()", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default());
                                            return ((Assertions)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 622));
                                        }
                                        {
                                            if ($outer == null) {
                                                throw null;
                                            }
                                            this.$outer = $outer;
                                        }
                                    }, this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                    StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this, thrownException){
                                        public static final long serialVersionUID = 0L;
                                        private final /* synthetic */ StreamTest$.anonfun.executeAction.1.9 $outer;
                                        private final StreamingQueryException thrownException$1;

                                        public final boolean apply() {
                                            return this.apply$mcZ$sp();
                                        }

                                        public boolean apply$mcZ$sp() {
                                            return ((TripleEquals)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).convertToEqualizer((Object)((StreamExecution)this.$outer.currentStream$1.elem).exception()).$eq$eq$eq((Object)new Some((Object)((Object)this.thrownException$1)), Equality$.MODULE$.default());
                                        }
                                        {
                                            if ($outer == null) {
                                                throw null;
                                            }
                                            this.$outer = $outer;
                                            this.thrownException$1 = thrownException$1;
                                        }
                                    }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"incorrect exception returned by query.exception()"})).s((Seq)Nil$.MODULE$), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                    StreamingQueryException exception = (StreamingQueryException)((Object)((StreamExecution)this.currentStream$1.elem).exception().get());
                                    StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this, exception){
                                        public static final long serialVersionUID = 0L;
                                        private final /* synthetic */ StreamTest$.anonfun.executeAction.1.9 $outer;
                                        private final StreamingQueryException exception$1;

                                        public final boolean apply() {
                                            return this.apply$mcZ$sp();
                                        }

                                        public boolean apply$mcZ$sp() {
                                            return ((TripleEquals)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).convertToEqualizer(this.exception$1.cause().getClass()).$eq$eq$eq(this.$outer.x9$1.causeClass(), Equality$.MODULE$.default());
                                        }
                                        {
                                            if ($outer == null) {
                                                throw null;
                                            }
                                            this.$outer = $outer;
                                            this.exception$1 = exception$1;
                                        }
                                    }, new StringBuilder().append((Object)"incorrect cause in exception returned by query.exception()\n").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\\tExpected: ", "\\n\\tReturned: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.x9$1.causeClass(), exception.cause().getClass()}))).toString(), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                    if (this.x9$1.isFatalError()) {
                                        StreamTest$class.verify$1(this.$outer, (Function0)new Serializable(this){
                                            public static final long serialVersionUID = 0L;
                                            private final /* synthetic */ StreamTest$.anonfun.executeAction.1.9 $outer;

                                            public final boolean apply() {
                                                return this.apply$mcZ$sp();
                                            }

                                            public boolean apply$mcZ$sp() {
                                                return (Throwable)this.$outer.streamThreadDeathCause$1.elem != null && ((TripleEquals)this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer()).convertToEqualizer(((Throwable)this.$outer.streamThreadDeathCause$1.elem).getClass()).$eq$eq$eq(this.$outer.x9$1.causeClass(), Equality$.MODULE$.default());
                                            }
                                            {
                                                if ($outer == null) {
                                                    throw null;
                                                }
                                                this.$outer = $outer;
                                            }
                                        }, new StringBuilder().append((Object)"UncaughtExceptionHandler didn't receive the correct error\n").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\\tExpected: ", "\\n\\tReturned: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.x9$1.causeClass(), (Throwable)this.streamThreadDeathCause$1.elem}))).toString(), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                        this.streamThreadDeathCause$1.elem = null;
                                    }
                                    this.x9$1.assertFailure().apply((Object)exception.getCause());
                                }

                                public /* synthetic */ StreamTest org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer() {
                                    return this.$outer;
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.pos$1 = pos$1;
                                    this.currentStream$1 = currentStream$1;
                                    this.sink$1 = sink$1;
                                    this.streamThreadDeathCause$1 = streamThreadDeathCause$1;
                                    this.startedManually$1 = startedManually$1;
                                    this.x9$1 = x9$1;
                                    this.outputMode$1 = outputMode$1;
                                    this.actions$2 = actions$2;
                                }
                            }, $this.defaultSignaler(), Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 617), Timed$.MODULE$.timed());
                            break block41;
                        }
                        if (streamAction instanceof StreamTest.AssertOnQuery) {
                            assertOnQuery = (StreamTest.AssertOnQuery)streamAction;
                            StreamTest$class.verify$1($this, (Function0)new Serializable($this, currentStream$1, lastStream$1){
                                public static final long serialVersionUID = 0L;
                                private final ObjectRef currentStream$1;
                                private final ObjectRef lastStream$1;

                                public final boolean apply() {
                                    return this.apply$mcZ$sp();
                                }

                                public boolean apply$mcZ$sp() {
                                    return (StreamExecution)this.currentStream$1.elem != null || (StreamExecution)this.lastStream$1.elem != null;
                                }
                                {
                                    this.currentStream$1 = currentStream$1;
                                    this.lastStream$1 = lastStream$1;
                                }
                            }, "cannot assert when no stream has been started", pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            StreamExecution streamToAssert = (StreamExecution)Option$.MODULE$.apply((Object)((StreamExecution)currentStream$1.elem)).getOrElse((Function0)new Serializable($this, lastStream$1){
                                public static final long serialVersionUID = 0L;
                                private final ObjectRef lastStream$1;

                                public final StreamExecution apply() {
                                    return (StreamExecution)this.lastStream$1.elem;
                                }
                                {
                                    this.lastStream$1 = lastStream$1;
                                }
                            });
                            StreamTest$class.verify$1($this, (Function0)new Serializable($this, streamToAssert, assertOnQuery){
                                public static final long serialVersionUID = 0L;
                                private final StreamExecution streamToAssert$1;
                                private final StreamTest.AssertOnQuery x10$1;

                                public final boolean apply() {
                                    return this.apply$mcZ$sp();
                                }

                                public boolean apply$mcZ$sp() {
                                    return BoxesRunTime.unboxToBoolean((Object)this.x10$1.condition().apply((Object)this.streamToAssert$1));
                                }
                                {
                                    this.streamToAssert$1 = streamToAssert$1;
                                    this.x10$1 = x10$1;
                                }
                            }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Assert on query failed: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{assertOnQuery.message()})), pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
                            return;
                        }
                        if (streamAction instanceof StreamTest.Assert) {
                            StreamTest.Assert assert_ = (StreamTest.Assert)streamAction;
                            StreamExecution streamToAssert = (StreamExecution)Option$.MODULE$.apply((Object)((StreamExecution)currentStream$1.elem)).getOrElse((Function0)new Serializable($this, lastStream$1){
                                public static final long serialVersionUID = 0L;
                                private final ObjectRef lastStream$1;

                                public final StreamExecution apply() {
                                    return (StreamExecution)this.lastStream$1.elem;
                                }
                                {
                                    this.lastStream$1 = lastStream$1;
                                }
                            });
                            StreamTest$class.verify$1($this, (Function0)new Serializable($this, assert_){
                                public static final long serialVersionUID = 0L;
                                private final StreamTest.Assert x11$1;

                                public final boolean apply() {
                                    return this.apply$mcZ$sp();
                                }

                                public boolean apply$mcZ$sp() {
                                    this.x11$1.run();
                                    return true;
                                }
                                {
                                    this.x11$1 = x11$1;
                                }
                            }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Assert failed: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{assert_.message()})), pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                            BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
                            return;
                        }
                        if (streamAction instanceof StreamTest.AddData) {
                            Tuple2 tuple2;
                            Option queryToUse;
                            Tuple2<BaseStreamingSource, Offset> tuple22;
                            StreamTest.AddData addData2 = (StreamTest.AddData)streamAction;
                            if ((StreamExecution)currentStream$1.elem != null && ((StreamExecution)currentStream$1.elem).triggerClock() instanceof StreamManualClock) {
                                StreamManualClock clock = (StreamManualClock)((StreamExecution)currentStream$1.elem).triggerClock();
                                StreamTest$class.eventually$1($this, "Error while synchronizing with manual clock before adding data", (Function0)new Serializable($this, currentStream$1, clock){
                                    public static final long serialVersionUID = 0L;
                                    private final /* synthetic */ StreamTest $outer;
                                    private final ObjectRef currentStream$1;
                                    private final StreamManualClock clock$2;

                                    public final Object apply() {
                                        BoxedUnit boxedUnit;
                                        if (((StreamExecution)this.currentStream$1.elem).isActive()) {
                                            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(this.clock$2.isStreamWaitingAt(this.clock$2.getTimeMillis()), "clock.isStreamWaitingAt(clock.getTimeMillis())", Prettifier$.MODULE$.default());
                                            boxedUnit = ((Assertions)this.$outer).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 682));
                                        } else {
                                            boxedUnit = BoxedUnit.UNIT;
                                        }
                                        return boxedUnit;
                                    }
                                    {
                                        if ($outer == null) {
                                            throw null;
                                        }
                                        this.$outer = $outer;
                                        this.currentStream$1 = currentStream$1;
                                        this.clock$2 = clock$2;
                                    }
                                }, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                                if (!((StreamExecution)currentStream$1.elem).isActive()) {
                                    throw StreamTest$class.failTest$1($this, "Query terminated while synchronizing with manual clock", StreamTest$class.failTest$default$2$1($this), pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                                }
                            }
                            if ((tuple22 = addData2.addData((Option<StreamExecution>)(queryToUse = Option$.MODULE$.apply((Object)((StreamExecution)currentStream$1.elem)).orElse((Function0)new Serializable($this, lastStream$1){
                                public static final long serialVersionUID = 0L;
                                private final ObjectRef lastStream$1;

                                public final Option<StreamExecution> apply() {
                                    return Option$.MODULE$.apply((Object)((StreamExecution)this.lastStream$1.elem));
                                }
                                {
                                    this.lastStream$1 = lastStream$1;
                                }
                            })))) == null) throw new MatchError(tuple22);
                            BaseStreamingSource source = (BaseStreamingSource)tuple22._1();
                            Offset offset = (Offset)tuple22._2();
                            Tuple2 tuple23 = tuple2 = new Tuple2((Object)source, (Object)offset);
                            BaseStreamingSource source2 = (BaseStreamingSource)tuple23._1();
                            Offset offset2 = (Offset)tuple23._2();
                            int sourceIndex = BoxesRunTime.unboxToInt((Object)queryToUse.flatMap((Function1)new Serializable($this, source2){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ StreamTest $outer;
                                private final BaseStreamingSource source$1;

                                public final Option<Object> apply(StreamExecution query) {
                                    return StreamTest$class.findSourceIndex$1(this.$outer, query.logicalPlan(), this.source$1);
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.source$1 = source$1;
                                }
                            }).orElse((Function0)new Serializable($this, stream$1, source2){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ StreamTest $outer;
                                private final Dataset stream$1;
                                private final BaseStreamingSource source$1;

                                public final Option<Object> apply() {
                                    return StreamTest$class.findSourceIndex$1(this.$outer, this.stream$1.logicalPlan(), this.source$1);
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.stream$1 = stream$1;
                                    this.source$1 = source$1;
                                }
                            }).orElse((Function0)new Serializable($this, queryToUse, source2){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ StreamTest $outer;
                                private final Option queryToUse$1;
                                public final BaseStreamingSource source$1;

                                public final Option<Object> apply() {
                                    return this.queryToUse$1.flatMap((Function1)new Serializable(this){
                                        public static final long serialVersionUID = 0L;
                                        private final /* synthetic */ StreamTest$.anonfun.13 $outer;

                                        public final Option<Object> apply(StreamExecution q) {
                                            return StreamTest$class.findSourceIndex$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer(), q.lastExecution().logical(), this.$outer.source$1);
                                        }
                                        {
                                            if ($outer == null) {
                                                throw null;
                                            }
                                            this.$outer = $outer;
                                        }
                                    });
                                }

                                public /* synthetic */ StreamTest org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer() {
                                    return this.$outer;
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.queryToUse$1 = queryToUse$1;
                                    this.source$1 = source$1;
                                }
                            }).getOrElse((Function0)new Serializable($this){
                                public static final long serialVersionUID = 0L;

                                public final Nothing$ apply() {
                                    throw new IllegalArgumentException("Could not find index of the source to which data was added");
                                }
                            }));
                            awaiting$1.put((Object)BoxesRunTime.boxToInteger((int)sourceIndex), (Object)offset2);
                            BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
                            return;
                        }
                        if (streamAction instanceof StreamTest.ExternalAction) {
                            StreamTest.ExternalAction externalAction = (StreamTest.ExternalAction)streamAction;
                            externalAction.runAction();
                            BoxedUnit boxedUnit10 = BoxedUnit.UNIT;
                            return;
                        }
                        if (streamAction instanceof StreamTest.CheckAnswerRows) {
                            StreamTest.CheckAnswerRows checkAnswerRows = (StreamTest.CheckAnswerRows)streamAction;
                            Seq<Row> expectedAnswer = checkAnswerRows.expectedAnswer();
                            boolean lastOnly = checkAnswerRows.lastOnly();
                            boolean isSorted2 = checkAnswerRows.isSorted();
                            Seq sparkAnswer2 = StreamTest$class.fetchStreamAnswer$1($this, (StreamExecution)currentStream$1.elem, lastOnly, StreamTest$class.fetchStreamAnswer$default$3$1($this), pos$1, currentStream$1, awaiting$1, sink$1, streamThreadDeathCause$1, startedManually$1, lastFetchedMemorySinkLastBatchId$1, outputMode$1, actions$2);
                            QueryTest$.MODULE$.sameRows(expectedAnswer, (Seq<Row>)sparkAnswer2, isSorted2).foreach((Function1)new Serializable($this, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ StreamTest $outer;
                                private final IntRef pos$1;
                                private final ObjectRef currentStream$1;
                                private final MemorySinkBase sink$1;
                                private final VolatileObjectRef streamThreadDeathCause$1;
                                private final boolean startedManually$1;
                                private final OutputMode outputMode$1;
                                private final Seq actions$2;

                                public final Nothing$ apply(String error) {
                                    return StreamTest$class.failTest$1(this.$outer, error, StreamTest$class.failTest$default$2$1(this.$outer), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.pos$1 = pos$1;
                                    this.currentStream$1 = currentStream$1;
                                    this.sink$1 = sink$1;
                                    this.streamThreadDeathCause$1 = streamThreadDeathCause$1;
                                    this.startedManually$1 = startedManually$1;
                                    this.outputMode$1 = outputMode$1;
                                    this.actions$2 = actions$2;
                                }
                            });
                            BoxedUnit boxedUnit11 = BoxedUnit.UNIT;
                            return;
                        }
                        if (streamAction instanceof StreamTest.CheckAnswerRowsContains) {
                            StreamTest.CheckAnswerRowsContains checkAnswerRowsContains = (StreamTest.CheckAnswerRowsContains)streamAction;
                            Seq<Row> expectedAnswer = checkAnswerRowsContains.expectedAnswer();
                            boolean lastOnly = checkAnswerRowsContains.lastOnly();
                            StreamExecution streamExecution = (StreamExecution)currentStream$1.elem;
                            Seq seq = streamExecution == null ? StreamTest$class.fetchStreamAnswer$1($this, (StreamExecution)lastStream$1.elem, lastOnly, StreamTest$class.fetchStreamAnswer$default$3$1($this), pos$1, currentStream$1, awaiting$1, sink$1, streamThreadDeathCause$1, startedManually$1, lastFetchedMemorySinkLastBatchId$1, outputMode$1, actions$2) : StreamTest$class.fetchStreamAnswer$1($this, streamExecution, lastOnly, StreamTest$class.fetchStreamAnswer$default$3$1($this), pos$1, currentStream$1, awaiting$1, sink$1, streamThreadDeathCause$1, startedManually$1, lastFetchedMemorySinkLastBatchId$1, outputMode$1, actions$2);
                            Seq sparkAnswer3 = seq;
                            QueryTest$.MODULE$.includesRows(expectedAnswer, (Seq<Row>)sparkAnswer3).foreach((Function1)new Serializable($this, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ StreamTest $outer;
                                private final IntRef pos$1;
                                private final ObjectRef currentStream$1;
                                private final MemorySinkBase sink$1;
                                private final VolatileObjectRef streamThreadDeathCause$1;
                                private final boolean startedManually$1;
                                private final OutputMode outputMode$1;
                                private final Seq actions$2;

                                public final Nothing$ apply(String error) {
                                    return StreamTest$class.failTest$1(this.$outer, error, StreamTest$class.failTest$default$2$1(this.$outer), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.pos$1 = pos$1;
                                    this.currentStream$1 = currentStream$1;
                                    this.sink$1 = sink$1;
                                    this.streamThreadDeathCause$1 = streamThreadDeathCause$1;
                                    this.startedManually$1 = startedManually$1;
                                    this.outputMode$1 = outputMode$1;
                                    this.actions$2 = actions$2;
                                }
                            });
                            BoxedUnit boxedUnit12 = BoxedUnit.UNIT;
                            return;
                        }
                        if (!(streamAction instanceof StreamTest.CheckAnswerRowsByFunc)) {
                            if (!(streamAction instanceof StreamTest.CheckNewAnswerRows)) throw new MatchError((Object)streamAction);
                            StreamTest.CheckNewAnswerRows checkNewAnswerRows = (StreamTest.CheckNewAnswerRows)streamAction;
                            Seq<Row> expectedAnswer = checkNewAnswerRows.expectedAnswer();
                            StreamExecution x$30 = (StreamExecution)currentStream$1.elem;
                            boolean x$31 = true;
                            boolean x$32 = StreamTest$class.fetchStreamAnswer$default$2$1($this);
                            Seq sparkAnswer4 = StreamTest$class.fetchStreamAnswer$1($this, x$30, x$32, x$31, pos$1, currentStream$1, awaiting$1, sink$1, streamThreadDeathCause$1, startedManually$1, lastFetchedMemorySinkLastBatchId$1, outputMode$1, actions$2);
                            QueryTest$.MODULE$.sameRows(expectedAnswer, (Seq<Row>)sparkAnswer4, QueryTest$.MODULE$.sameRows$default$3()).foreach((Function1)new Serializable($this, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ StreamTest $outer;
                                private final IntRef pos$1;
                                private final ObjectRef currentStream$1;
                                private final MemorySinkBase sink$1;
                                private final VolatileObjectRef streamThreadDeathCause$1;
                                private final boolean startedManually$1;
                                private final OutputMode outputMode$1;
                                private final Seq actions$2;

                                public final Nothing$ apply(String error) {
                                    return StreamTest$class.failTest$1(this.$outer, error, StreamTest$class.failTest$default$2$1(this.$outer), this.pos$1, this.currentStream$1, this.sink$1, this.streamThreadDeathCause$1, this.startedManually$1, this.outputMode$1, this.actions$2);
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.pos$1 = pos$1;
                                    this.currentStream$1 = currentStream$1;
                                    this.sink$1 = sink$1;
                                    this.streamThreadDeathCause$1 = streamThreadDeathCause$1;
                                    this.startedManually$1 = startedManually$1;
                                    this.outputMode$1 = outputMode$1;
                                    this.actions$2 = actions$2;
                                }
                            });
                            BoxedUnit boxedUnit13 = BoxedUnit.UNIT;
                            return;
                        }
                        StreamTest.CheckAnswerRowsByFunc checkAnswerRowsByFunc = (StreamTest.CheckAnswerRowsByFunc)streamAction;
                        globalCheckFunction = checkAnswerRowsByFunc.globalCheckFunction();
                        boolean lastOnly = checkAnswerRowsByFunc.lastOnly();
                        StreamExecution streamExecution = (StreamExecution)currentStream$1.elem;
                        Seq seq = streamExecution == null ? StreamTest$class.fetchStreamAnswer$1($this, (StreamExecution)lastStream$1.elem, lastOnly, StreamTest$class.fetchStreamAnswer$default$3$1($this), pos$1, currentStream$1, awaiting$1, sink$1, streamThreadDeathCause$1, startedManually$1, lastFetchedMemorySinkLastBatchId$1, outputMode$1, actions$2) : StreamTest$class.fetchStreamAnswer$1($this, streamExecution, lastOnly, StreamTest$class.fetchStreamAnswer$default$3$1($this), pos$1, currentStream$1, awaiting$1, sink$1, streamThreadDeathCause$1, startedManually$1, lastFetchedMemorySinkLastBatchId$1, outputMode$1, actions$2);
                        sparkAnswer = seq;
                        try {}
                        catch (Throwable throwable) {
                            throw StreamTest$class.failTest$1($this, throwable.toString(), StreamTest$class.failTest$default$2$1($this), pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                        }
                        break block42;
                        catch (Throwable throwable) {
                            Throwable throwable2 = throwable;
                            Option option = NonFatal$.MODULE$.unapply(throwable2);
                            if (option.isEmpty()) {
                                throw throwable;
                            }
                            Throwable e = (Throwable)option.get();
                            throw StreamTest$class.failTest$1($this, "Error adding data", e, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                        }
                        catch (Throwable throwable) {
                            Throwable throwable3 = throwable;
                            Option option = NonFatal$.MODULE$.unapply(throwable3);
                            if (option.isEmpty()) {
                                throw throwable;
                            }
                            Throwable e = (Throwable)option.get();
                            throw StreamTest$class.failTest$1($this, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Assert on query failed: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{assertOnQuery.message()})), e, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                        }
                        catch (Throwable throwable) {
                            try {
                                throw StreamTest$class.failTest$1($this, "Error while checking stream failure", throwable, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                                catch (TestFailedDueToTimeoutException testFailedDueToTimeoutException) {
                                    throw StreamTest$class.failTest$1($this, "Timed out while waiting for failure", testFailedDueToTimeoutException, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                                }
                                catch (InterruptedException interruptedException) {
                                    boxedUnit = BoxedUnit.UNIT;
                                    break block41;
                                }
                            }
                            finally {
                                lastStream$1.elem = (StreamExecution)currentStream$1.elem;
                                currentStream$1.elem = null;
                            }
                        }
                        catch (Throwable throwable) {
                            try {
                                throw StreamTest$class.failTest$1($this, "Error while stopping stream", throwable, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                                catch (TestFailedDueToTimeoutException testFailedDueToTimeoutException) {
                                    throw StreamTest$class.failTest$1($this, "Timed out while stopping and waiting for microbatchthread to terminate.", testFailedDueToTimeoutException, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                                }
                                catch (InterruptedException interruptedException) {
                                    boxedUnit2 = BoxedUnit.UNIT;
                                    break block40;
                                }
                            }
                            finally {
                                lastStream$1.elem = (StreamExecution)currentStream$1.elem;
                                currentStream$1.elem = null;
                            }
                        }
                        catch (StreamingQueryException streamingQueryException) {
                            boxedUnit3 = BoxedUnit.UNIT;
                        }
                        break block43;
                    }
                    BoxedUnit boxedUnit14 = (BoxedUnit)globalCheckFunction.apply((Object)sparkAnswer);
                    return;
                }
                BoxedUnit boxedUnit15 = boxedUnit3;
                return;
            }
            BoxedUnit boxedUnit16 = boxedUnit2;
            return;
        }
        BoxedUnit boxedUnit17 = boxedUnit;
    }

    /*
     * Loose catch block
     */
    private static final void liftedTree1$1(StreamTest $this, Dataset stream$1, SparkSession sparkSession$1, IntRef pos$1, ObjectRef currentStream$1, ObjectRef lastStream$1, HashMap awaiting$1, MemorySinkBase sink$1, Map resetConfValues$1, String defaultCheckpointLocation$1, LongRef manualClockExpectedTime$1, VolatileObjectRef streamThreadDeathCause$1, StreamingQueryListener listener$1, boolean startedManually$1, Seq startedTest$1, LongRef lastFetchedMemorySinkLastBatchId$1, OutputMode outputMode$1, Seq actions$2) {
        block9: {
            startedTest$1.foreach((Function1)new Serializable($this, stream$1, sparkSession$1, pos$1, currentStream$1, lastStream$1, awaiting$1, sink$1, resetConfValues$1, defaultCheckpointLocation$1, manualClockExpectedTime$1, streamThreadDeathCause$1, startedManually$1, lastFetchedMemorySinkLastBatchId$1, outputMode$1, actions$2){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ StreamTest $outer;
                public final Dataset stream$1;
                public final SparkSession sparkSession$1;
                public final IntRef pos$1;
                public final ObjectRef currentStream$1;
                public final ObjectRef lastStream$1;
                public final HashMap awaiting$1;
                public final MemorySinkBase sink$1;
                public final Map resetConfValues$1;
                public final String defaultCheckpointLocation$1;
                public final LongRef manualClockExpectedTime$1;
                public final VolatileObjectRef streamThreadDeathCause$1;
                public final boolean startedManually$1;
                public final LongRef lastFetchedMemorySinkLastBatchId$1;
                public final OutputMode outputMode$1;
                public final Seq actions$2;

                public final void apply(StreamTest.StreamAction x0$5) {
                    StreamTest.StreamAction streamAction;
                    block4: {
                        block3: {
                            block2: {
                                streamAction = x0$5;
                                if (!(streamAction instanceof StreamTest.StreamProgressLockedActions)) break block2;
                                StreamTest.StreamProgressLockedActions streamProgressLockedActions = (StreamTest.StreamProgressLockedActions)streamAction;
                                Seq<StreamTest.StreamAction> actns = streamProgressLockedActions.actions();
                                StreamExecution $org_scalatest_assert_macro_left = (StreamExecution)this.currentStream$1.elem;
                                Object $org_scalatest_assert_macro_right = null;
                                StreamExecution streamExecution = $org_scalatest_assert_macro_left;
                                Object var9_7 = null;
                                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "!=", null, streamExecution != null ? !streamExecution.equals(var9_7) : var9_7 != null, Prettifier$.MODULE$.default());
                                ((Assertions)this.$outer).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Cannot perform stream-progress-locked actions ", " when query is not active"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{actns})), Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 768));
                                StreamExecution $org_scalatest_assert_macro_left2 = (StreamExecution)this.currentStream$1.elem;
                                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.isInstanceOfMacroBool((Object)$org_scalatest_assert_macro_left2, "isInstanceOf", "org.apache.spark.sql.execution.streaming.MicroBatchExecution", $org_scalatest_assert_macro_left2 instanceof MicroBatchExecution, Prettifier$.MODULE$.default());
                                ((Assertions)this.$outer).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Cannot perform stream-progress-locked actions on non-microbatch queries"})).s((Seq)Nil$.MODULE$), Prettifier$.MODULE$.default(), new Position("StreamTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 770));
                                ((MicroBatchExecution)((StreamExecution)this.currentStream$1.elem)).withProgressLocked((Function0)new Serializable(this, actns){
                                    public static final long serialVersionUID = 0L;
                                    private final /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 $outer;
                                    private final Seq actns$1;

                                    public final void apply() {
                                        this.apply$mcV$sp();
                                    }

                                    public void apply$mcV$sp() {
                                        this.actns$1.foreach((Function1)new Serializable(this){
                                            public static final long serialVersionUID = 0L;
                                            private final /* synthetic */ StreamTest$$anonfun$liftedTree1$1$1$.anonfun.apply.4 $outer;

                                            public final void apply(StreamTest.StreamAction action) {
                                                StreamTest$class.executeAction$1(this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer(), action, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().stream$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().sparkSession$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().pos$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().currentStream$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().lastStream$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().awaiting$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().sink$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().resetConfValues$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().defaultCheckpointLocation$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().manualClockExpectedTime$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().streamThreadDeathCause$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().startedManually$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().lastFetchedMemorySinkLastBatchId$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().outputMode$1, this.$outer.org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer().actions$2);
                                            }
                                            {
                                                if ($outer == null) {
                                                    throw null;
                                                }
                                                this.$outer = $outer;
                                            }
                                        });
                                    }

                                    public /* synthetic */ StreamTest$.anonfun.liftedTree1.1.1 org$apache$spark$sql$streaming$StreamTest$$anonfun$$anonfun$$$outer() {
                                        return this.$outer;
                                    }
                                    {
                                        if ($outer == null) {
                                            throw null;
                                        }
                                        this.$outer = $outer;
                                        this.actns$1 = actns$1;
                                    }
                                });
                                ++this.pos$1.elem;
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                break block3;
                            }
                            if (streamAction == null) break block4;
                            StreamTest.StreamAction streamAction2 = streamAction;
                            StreamTest$class.executeAction$1(this.$outer, streamAction2, this.stream$1, this.sparkSession$1, this.pos$1, this.currentStream$1, this.lastStream$1, this.awaiting$1, this.sink$1, this.resetConfValues$1, this.defaultCheckpointLocation$1, this.manualClockExpectedTime$1, this.streamThreadDeathCause$1, this.startedManually$1, this.lastFetchedMemorySinkLastBatchId$1, this.outputMode$1, this.actions$2);
                            ++this.pos$1.elem;
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        }
                        return;
                    }
                    throw new MatchError((Object)streamAction);
                }

                public /* synthetic */ StreamTest org$apache$spark$sql$streaming$StreamTest$$anonfun$$$outer() {
                    return this.$outer;
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.stream$1 = stream$1;
                    this.sparkSession$1 = sparkSession$1;
                    this.pos$1 = pos$1;
                    this.currentStream$1 = currentStream$1;
                    this.lastStream$1 = lastStream$1;
                    this.awaiting$1 = awaiting$1;
                    this.sink$1 = sink$1;
                    this.resetConfValues$1 = resetConfValues$1;
                    this.defaultCheckpointLocation$1 = defaultCheckpointLocation$1;
                    this.manualClockExpectedTime$1 = manualClockExpectedTime$1;
                    this.streamThreadDeathCause$1 = streamThreadDeathCause$1;
                    this.startedManually$1 = startedManually$1;
                    this.lastFetchedMemorySinkLastBatchId$1 = lastFetchedMemorySinkLastBatchId$1;
                    this.outputMode$1 = outputMode$1;
                    this.actions$2 = actions$2;
                }
            });
            if ((Throwable)streamThreadDeathCause$1.elem != null) break block9;
            {
                catch (Throwable throwable) {
                    Throwable throwable2 = throwable;
                    if (throwable2 instanceof InterruptedException && (Throwable)streamThreadDeathCause$1.elem != null) {
                        throw StreamTest$class.failTest$1($this, "Stream Thread Died", (Throwable)streamThreadDeathCause$1.elem, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                    }
                    if (throwable2 instanceof TestFailedDueToTimeoutException) {
                        TestFailedDueToTimeoutException testFailedDueToTimeoutException = (TestFailedDueToTimeoutException)throwable2;
                        throw StreamTest$class.failTest$1($this, "Timed out waiting for stream", testFailedDueToTimeoutException, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
                    }
                    throw throwable;
                }
            }
            if ((StreamExecution)currentStream$1.elem != null && ((StreamExecution)currentStream$1.elem).queryExecutionThread().isAlive()) {
                ((StreamExecution)currentStream$1.elem).stop();
            }
            resetConfValues$1.foreach((Function1)new Serializable($this, sparkSession$1){
                public static final long serialVersionUID = 0L;
                private final SparkSession sparkSession$1;

                public final void apply(Tuple2<String, Option<String>> x0$6) {
                    Tuple2<String, Option<String>> tuple2;
                    block4: {
                        block3: {
                            block2: {
                                tuple2 = x0$6;
                                if (tuple2 == null) break block2;
                                String key = (String)tuple2._1();
                                Option option = (Option)tuple2._2();
                                if (!(option instanceof Some)) break block2;
                                Some some = (Some)option;
                                String value = (String)some.x();
                                this.sparkSession$1.conf().set(key, value);
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                break block3;
                            }
                            if (tuple2 == null) break block4;
                            String key = (String)tuple2._1();
                            Option option = (Option)tuple2._2();
                            if (!None$.MODULE$.equals(option)) break block4;
                            this.sparkSession$1.conf().unset(key);
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        }
                        return;
                    }
                    throw new MatchError(tuple2);
                }
                {
                    this.sparkSession$1 = sparkSession$1;
                }
            });
            sparkSession$1.streams().removeListener(listener$1);
            return;
        }
        try {
            throw StreamTest$class.failTest$1($this, "Stream Thread Died", (Throwable)streamThreadDeathCause$1.elem, pos$1, currentStream$1, sink$1, streamThreadDeathCause$1, startedManually$1, outputMode$1, actions$2);
        }
        catch (Throwable throwable) {
            if ((StreamExecution)currentStream$1.elem != null && ((StreamExecution)currentStream$1.elem).queryExecutionThread().isAlive()) {
                ((StreamExecution)currentStream$1.elem).stop();
            }
            resetConfValues$1.foreach((Function1)new /* invalid duplicate definition of identical inner class */);
            sparkSession$1.streams().removeListener(listener$1);
            throw throwable;
        }
    }

    public static final ArrayBuffer addCheck$1(StreamTest $this, ExpressionEncoder intEncoder$1, IntRef dataPos$1, ArrayBuffer actions$1) {
        return actions$1.$plus$eq((Object)$this.CheckAnswer().apply(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), dataPos$1.elem), intEncoder$1));
    }

    public static final ArrayBuffer addRandomData$1(StreamTest $this, IntRef dataPos$1, BooleanRef running$1, ArrayBuffer actions$1, Function2 addData$2) {
        int numItems = Random$.MODULE$.nextInt(10);
        Range data = RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(dataPos$1.elem), dataPos$1.elem + numItems);
        dataPos$1.elem += numItems;
        return actions$1.$plus$eq(addData$2.apply((Object)data, (Object)BoxesRunTime.boxToBoolean((boolean)running$1.elem)));
    }

    public static void $init$(StreamTest $this) {
        $this.org$apache$spark$sql$streaming$StreamTest$_setter_$defaultSignaler_$eq((Signaler)ThreadSignaler$.MODULE$);
        $this.org$apache$spark$sql$streaming$StreamTest$_setter_$defaultTrigger_$eq(Trigger.ProcessingTime((long)0L));
        $this.org$apache$spark$sql$streaming$StreamTest$_setter_$defaultUseV2Sink_$eq(false);
        $this.org$apache$spark$sql$streaming$StreamTest$_setter_$streamingTimeout_$eq(SpanSugar$.MODULE$.convertIntToGrainOfTime(60).seconds());
    }
}

