/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming;

import java.io.Serializable;
import java.lang.invoke.LambdaMetafactory;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.catalyst.expressions.Alias;
import org.apache.spark.sql.catalyst.expressions.Alias$;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp;
import org.apache.spark.sql.catalyst.expressions.CurrentDate;
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp;
import org.apache.spark.sql.catalyst.expressions.ExprId;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.FileSourceMetadataAttribute$;
import org.apache.spark.sql.catalyst.expressions.LocalTimestamp;
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation;
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation$;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.Project;
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2;
import org.apache.spark.sql.catalyst.streaming.WriteToStream;
import org.apache.spark.sql.catalyst.trees.TreePattern$;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.SparkDataStream;
import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
import org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow;
import org.apache.spark.sql.errors.QueryExecutionErrors$;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.datasources.DataSource;
import org.apache.spark.sql.execution.datasources.LogicalRelation;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits$;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$;
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation$;
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec;
import org.apache.spark.sql.execution.streaming.AcceptsLatestSeenOffsetHandler$;
import org.apache.spark.sql.execution.streaming.AvailableNowDataStreamWrapper;
import org.apache.spark.sql.execution.streaming.AvailableNowMicroBatchStreamWrapper;
import org.apache.spark.sql.execution.streaming.AvailableNowSourceWrapper;
import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
import org.apache.spark.sql.execution.streaming.CommitMetadata;
import org.apache.spark.sql.execution.streaming.IncrementalExecution;
import org.apache.spark.sql.execution.streaming.MicroBatchExecution$;
import org.apache.spark.sql.execution.streaming.MultiBatchExecutor;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.OffsetHolder;
import org.apache.spark.sql.execution.streaming.OffsetSeq;
import org.apache.spark.sql.execution.streaming.OffsetSeqMetadata;
import org.apache.spark.sql.execution.streaming.OffsetSeqMetadata$;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
import org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import org.apache.spark.sql.execution.streaming.ProgressReporter;
import org.apache.spark.sql.execution.streaming.SerializedOffset;
import org.apache.spark.sql.execution.streaming.SingleBatchExecutor;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.execution.streaming.StreamExecution$;
import org.apache.spark.sql.execution.streaming.StreamProgress;
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation;
import org.apache.spark.sql.execution.streaming.StreamingRelation;
import org.apache.spark.sql.execution.streaming.TERMINATED$;
import org.apache.spark.sql.execution.streaming.TriggerExecutor;
import org.apache.spark.sql.execution.streaming.WatermarkTracker;
import org.apache.spark.sql.execution.streaming.WatermarkTracker$;
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource;
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource$;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.streaming.StreamingQueryStatus;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Product;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.LongRef;
import scala.runtime.NonLocalReturnControl;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\u0005uh\u0001B\u0012%\u0001EB\u0011B\u000e\u0001\u0003\u0002\u0003\u0006IaN\u001e\t\u0013q\u0002!\u0011!Q\u0001\nu\u0012\u0005\"C\"\u0001\u0005\u0003\u0005\u000b\u0011\u0002#K\u0011!Y\u0005A!A!\u0002\u0013a\u0005\u0002\u0003/\u0001\u0005\u0003\u0005\u000b\u0011B/\t\u000b\u0011\u0004A\u0011A3\t\u000f1\u0004\u0001\u0019!C\t[\"I\u0011\u0011\u0001\u0001A\u0002\u0013E\u00111\u0001\u0005\b\u0003#\u0001\u0001\u0015)\u0003o\u0011%\tY\u0002\u0001b\u0001\n\u0013\ti\u0002\u0003\u0005\u0002B\u0001\u0001\u000b\u0011BA\u0010\u0011-\t\u0019\u0005\u0001a\u0001\u0002\u0004%I!!\u0012\t\u0017\u00055\u0003\u00011AA\u0002\u0013%\u0011q\n\u0005\f\u0003'\u0002\u0001\u0019!A!B\u0013\t9\u0005\u0003\u0006\u0002V\u0001A)\u0019!C!\u0003/B\u0011\"!\u001b\u0001\u0001\u0004%I!a\u001b\t\u0013\u0005M\u0004\u00011A\u0005\n\u0005U\u0004\u0002CA=\u0001\u0001\u0006K!!\u001c\t\u000f\u0005m\u0004\u0001\"\u0011\u0002~!9\u0011q\u0010\u0001\u0005R\u0005u\u0004bBAA\u0001\u0011E\u00111\u0011\u0005\b\u0003\u0013\u0003A\u0011BAF\u0011\u001d\t\t\n\u0001C\u0005\u0003WBq!a%\u0001\t\u0013\t)\nC\u0004\u0002\"\u0002!I!a)\t\u000f\u0005%\u0006\u0001\"\u0003\u0002,\"A\u0011\u0011\u0017\u0001\u0005\u0002!\n\u0019\fC\u0007\u0002X\u0002\u0001\n1!A\u0001\n\u0013\tIn\u000f\u0005\u000e\u00037\u0004\u0001\u0013aA\u0001\u0002\u0013%\u0011Q\u001c&\b\u000f\u0005}G\u0005#\u0001\u0002b\u001a11\u0005\nE\u0001\u0003GDa\u0001Z\u0010\u0005\u0002\u0005-\b\"CAw?\t\u0007I\u0011AAx\u0011!\tYp\bQ\u0001\n\u0005E(aE'jGJ|')\u0019;dQ\u0016CXmY;uS>t'BA\u0013'\u0003%\u0019HO]3b[&twM\u0003\u0002(Q\u0005IQ\r_3dkRLwN\u001c\u0006\u0003S)\n1a]9m\u0015\tYC&A\u0003ta\u0006\u00148N\u0003\u0002.]\u00051\u0011\r]1dQ\u0016T\u0011aL\u0001\u0004_J<7\u0001A\n\u0003\u0001I\u0002\"a\r\u001b\u000e\u0003\u0011J!!\u000e\u0013\u0003\u001fM#(/Z1n\u000bb,7-\u001e;j_:\fAb\u001d9be.\u001cVm]:j_:\u0004\"\u0001O\u001d\u000e\u0003!J!A\u000f\u0015\u0003\u0019M\u0003\u0018M]6TKN\u001c\u0018n\u001c8\n\u0005Y\"\u0014a\u0002;sS\u001e<WM\u001d\t\u0003}\u0001k\u0011a\u0010\u0006\u0003K!J!!Q \u0003\u000fQ\u0013\u0018nZ4fe&\u0011A\bN\u0001\riJLwmZ3s\u00072|7m\u001b\t\u0003\u000b\"k\u0011A\u0012\u0006\u0003\u000f*\nA!\u001e;jY&\u0011\u0011J\u0012\u0002\u0006\u00072|7m[\u0005\u0003\u0007R\nA\"\u001a=ue\u0006|\u0005\u000f^5p]N\u0004B!\u0014,Z3:\u0011a\n\u0016\t\u0003\u001fJk\u0011\u0001\u0015\u0006\u0003#B\na\u0001\u0010:p_Rt$\"A*\u0002\u000bM\u001c\u0017\r\\1\n\u0005U\u0013\u0016A\u0002)sK\u0012,g-\u0003\u0002X1\n\u0019Q*\u00199\u000b\u0005U\u0013\u0006CA'[\u0013\tY\u0006L\u0001\u0004TiJLgnZ\u0001\u0005a2\fg\u000e\u0005\u0002_E6\tqL\u0003\u0002&A*\u0011\u0011\rK\u0001\tG\u0006$\u0018\r\\=ti&\u00111m\u0018\u0002\u000e/JLG/\u001a+p'R\u0014X-Y7\u0002\rqJg.\u001b;?)\u00191w\r[5kWB\u00111\u0007\u0001\u0005\u0006m\u0019\u0001\ra\u000e\u0005\u0006y\u0019\u0001\r!\u0010\u0005\u0006\u0007\u001a\u0001\r\u0001\u0012\u0005\u0006\u0017\u001a\u0001\r\u0001\u0014\u0005\u00069\u001a\u0001\r!X\u0001\bg>,(oY3t+\u0005q\u0007cA8uo:\u0011\u0001O\u001d\b\u0003\u001fFL\u0011aU\u0005\u0003gJ\u000bq\u0001]1dW\u0006<W-\u0003\u0002vm\n\u00191+Z9\u000b\u0005M\u0014\u0006C\u0001=\u007f\u001b\u0005I(BA\u0013{\u0015\tYH0\u0001\u0003sK\u0006$'BA?)\u0003%\u0019wN\u001c8fGR|'/\u0003\u0002\u0000s\ny1\u000b]1sW\u0012\u000bG/Y*ue\u0016\fW.A\u0006t_V\u00148-Z:`I\u0015\fH\u0003BA\u0003\u0003\u001b\u0001B!a\u0002\u0002\n5\t!+C\u0002\u0002\fI\u0013A!\u00168ji\"A\u0011q\u0002\u0005\u0002\u0002\u0003\u0007a.A\u0002yIE\n\u0001b]8ve\u000e,7\u000f\t\u0015\u0004\u0013\u0005U\u0001\u0003BA\u0004\u0003/I1!!\u0007S\u0005!1x\u000e\\1uS2,\u0017a\u0004;sS\u001e<WM]#yK\u000e,Ho\u001c:\u0016\u0005\u0005}!\u0003CA\u0011\u0003K\tY#!\r\u0007\r\u0005\r\u0002\u0001AA\u0010\u00051a$/\u001a4j]\u0016lWM\u001c;?!\u0011\t9!a\n\n\u0007\u0005%\"KA\u0004Qe>$Wo\u0019;\u0011\u0007M\ni#C\u0002\u00020\u0011\u0012q\u0002\u0016:jO\u001e,'/\u0012=fGV$xN\u001d\t\u0005\u0003g\ti$\u0004\u0002\u00026)!\u0011qGA\u001d\u0003\tIwN\u0003\u0002\u0002<\u0005!!.\u0019<b\u0013\u0011\ty$!\u000e\u0003\u0019M+'/[1mSj\f'\r\\3\u0002!Q\u0014\u0018nZ4fe\u0016CXmY;u_J\u0004\u0013\u0001E<bi\u0016\u0014X.\u0019:l)J\f7m[3s+\t\t9\u0005E\u00024\u0003\u0013J1!a\u0013%\u0005A9\u0016\r^3s[\u0006\u00148\u000e\u0016:bG.,'/\u0001\u000bxCR,'/\\1sWR\u0013\u0018mY6fe~#S-\u001d\u000b\u0005\u0003\u000b\t\t\u0006C\u0005\u0002\u00105\t\t\u00111\u0001\u0002H\u0005\tr/\u0019;fe6\f'o\u001b+sC\u000e\\WM\u001d\u0011\u0002\u00171|w-[2bYBc\u0017M\\\u000b\u0003\u00033\u0002B!a\u0017\u0002f5\u0011\u0011Q\f\u0006\u0005\u0003?\n\t'A\u0004m_\u001eL7-\u00197\u000b\u0007\u0005\r\u0004-A\u0003qY\u0006t7/\u0003\u0003\u0002h\u0005u#a\u0003'pO&\u001c\u0017\r\u001c)mC:\f\u0011$[:DkJ\u0014XM\u001c;CCR\u001c\u0007nQ8ogR\u0014Xo\u0019;fIV\u0011\u0011Q\u000e\t\u0005\u0003\u000f\ty'C\u0002\u0002rI\u0013qAQ8pY\u0016\fg.A\u000fjg\u000e+(O]3oi\n\u000bGo\u00195D_:\u001cHO];di\u0016$w\fJ3r)\u0011\t)!a\u001e\t\u0013\u0005=\u0011#!AA\u0002\u00055\u0014AG5t\u0007V\u0014(/\u001a8u\u0005\u0006$8\r[\"p]N$(/^2uK\u0012\u0004\u0013\u0001B:u_B$\"!!\u0002\u0002\u0019M$\u0018M\u001d;Ue&<w-\u001a:\u0002%I,h.Q2uSZ\fG/\u001a3TiJ,\u0017-\u001c\u000b\u0005\u0003\u000b\t)\t\u0003\u0004\u0002\bV\u0001\raN\u0001\u0016gB\f'o[*fgNLwN\u001c$peN#(/Z1n\u0003Q\u0001x\u000e];mCR,7\u000b^1si>3gm]3ugR!\u0011QAAG\u0011\u0019\tyI\u0006a\u0001o\u0005A2\u000f]1sWN+7o]5p]R{'+\u001e8CCR\u001c\u0007.Z:\u0002%%\u001ch*Z<ECR\f\u0017I^1jY\u0006\u0014G.Z\u0001\u000fO\u0016$8\u000b^1si>3gm]3u)\u0011\t9*!(\u0011\u0007a\fI*C\u0002\u0002\u001cf\u0014aa\u00144gg\u0016$\bBBAP1\u0001\u0007q/\u0001\u0006eCR\f7\u000b\u001e:fC6\f!cY8ogR\u0014Xo\u0019;OKb$()\u0019;dQR!\u0011QNAS\u0011\u001d\t9+\u0007a\u0001\u0003[\nAC\\8ECR\f')\u0019;dQ\u0016\u001cXI\\1cY\u0016$\u0017\u0001\u0003:v]\n\u000bGo\u00195\u0015\t\u0005\u0015\u0011Q\u0016\u0005\u0007\u0003_S\u0002\u0019A\u001c\u0002-M\u0004\u0018M]6TKN\u001c\u0018n\u001c8U_J+hNQ1uG\"\f!c^5uQB\u0013xn\u001a:fgNdunY6fIV!\u0011QWA^)\u0011\t9,!4\u0011\t\u0005e\u00161\u0018\u0007\u0001\t\u001d\til\u0007b\u0001\u0003\u007f\u0013\u0011\u0001V\t\u0005\u0003\u0003\f9\r\u0005\u0003\u0002\b\u0005\r\u0017bAAc%\n9aj\u001c;iS:<\u0007\u0003BA\u0004\u0003\u0013L1!a3S\u0005\r\te.\u001f\u0005\t\u0003\u001f\\B\u00111\u0001\u0002R\u0006\ta\r\u0005\u0004\u0002\b\u0005M\u0017qW\u0005\u0004\u0003+\u0014&\u0001\u0003\u001fcs:\fW.\u001a \u0002%M,\b/\u001a:%gB\f'o[*fgNLwN\\\u000b\u0002o\u0005\u00112/\u001e9fe\u0012\"(/[4hKJ\u001cEn\\2l+\u0005!\u0015aE'jGJ|')\u0019;dQ\u0016CXmY;uS>t\u0007CA\u001a '\ry\u0012Q\u001d\t\u0005\u0003\u000f\t9/C\u0002\u0002jJ\u0013a!\u00118z%\u00164GCAAq\u00031\u0011\u0015\tV\"I?&#ulS#Z+\t\t\t\u0010\u0005\u0003\u0002t\u0006eXBAA{\u0015\u0011\t90!\u000f\u0002\t1\fgnZ\u0005\u00047\u0006U\u0018!\u0004\"B)\u000eCu,\u0013#`\u0017\u0016K\u0006\u0005")
public class MicroBatchExecution
extends StreamExecution {
    private LogicalPlan logicalPlan;
    private scala.collection.immutable.Map<String, String> extraOptions;
    private final WriteToStream plan;
    private volatile Seq<SparkDataStream> sources;
    private final Product triggerExecutor;
    private WatermarkTracker watermarkTracker;
    private boolean isCurrentBatchConstructed;
    private volatile boolean bitmap$0;

    public static String BATCH_ID_KEY() {
        return MicroBatchExecution$.MODULE$.BATCH_ID_KEY();
    }

    public /* synthetic */ SparkSession org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession() {
        return super.sparkSession();
    }

    private /* synthetic */ Clock super$triggerClock() {
        return super.triggerClock();
    }

    @Override
    public Seq<SparkDataStream> sources() {
        return this.sources;
    }

    public void sources_$eq(Seq<SparkDataStream> x$1) {
        this.sources = x$1;
    }

    private Product triggerExecutor() {
        return this.triggerExecutor;
    }

    private WatermarkTracker watermarkTracker() {
        return this.watermarkTracker;
    }

    private void watermarkTracker_$eq(WatermarkTracker x$1) {
        this.watermarkTracker = x$1;
    }

    private LogicalPlan logicalPlan$lzycompute() {
        MicroBatchExecution microBatchExecution = this;
        synchronized (microBatchExecution) {
            if (!this.bitmap$0) {
                LogicalPlan logicalPlan2;
                Predef$.MODULE$.assert(this.queryExecutionThread() == Thread.currentThread(), (Function0 & Serializable)() -> new StringBuilder(56).append("logicalPlan must be initialized in QueryExecutionThread ").append(new StringBuilder(27).append("but the current thread was ").append(Thread.currentThread()).toString()).toString());
                LongRef nextSourceId = LongRef.create((long)0L);
                Map toExecutionRelationMap = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
                Map v2ToExecutionRelationMap = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
                Map v2ToRelationMap = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
                Seq disabledSources = Utils$.MODULE$.stringToSeq(super.sparkSession().sqlContext().conf().disabledV2StreamingMicroBatchReaders());
                LogicalPlan _logicalPlan = (LogicalPlan)this.analyzedPlan().transform((PartialFunction)new Serializable(this, toExecutionRelationMap, nextSourceId, disabledSources, v2ToRelationMap, v2ToExecutionRelationMap){
                    private static final long serialVersionUID = 0L;
                    private final /* synthetic */ MicroBatchExecution $outer;
                    private final Map toExecutionRelationMap$1;
                    private final LongRef nextSourceId$1;
                    private final Seq disabledSources$1;
                    private final Map v2ToRelationMap$1;
                    private final Map v2ToExecutionRelationMap$1;

                    /*
                     * Enabled aggressive block sorting
                     */
                    public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                        Object object;
                        A1 A1 = x1;
                        if (A1 instanceof StreamingRelation) {
                            StreamingRelation streamingRelation = (StreamingRelation)A1;
                            DataSource dataSourceV1 = streamingRelation.dataSource();
                            String sourceName = streamingRelation.sourceName();
                            Seq<Attribute> output = streamingRelation.output();
                            object = this.toExecutionRelationMap$1.getOrElseUpdate((Object)streamingRelation, (Function0 & Serializable)() -> {
                                String metadataPath = new StringBuilder(9).append($this.$outer.resolvedCheckpointRoot()).append("/sources/").append($this.nextSourceId$1.elem).toString();
                                Source source = dataSourceV1.createSource(metadataPath);
                                ++$this.nextSourceId$1.elem;
                                $this.$outer.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(45).append("Using Source [").append(source).append("] from DataSourceV1 named '").append(sourceName).append("' [").append(dataSourceV1).append("]").toString());
                                return new StreamingExecutionRelation(source, output, $this.$outer.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession());
                            });
                            return (B1)object;
                        }
                        if (A1 instanceof StreamingRelationV2) {
                            StreamingRelationV2 streamingRelationV2 = (StreamingRelationV2)A1;
                            Option src = streamingRelationV2.source();
                            String srcName = streamingRelationV2.sourceName();
                            Table table = streamingRelationV2.table();
                            CaseInsensitiveStringMap options = streamingRelationV2.extraOptions();
                            Seq output = streamingRelationV2.output();
                            Option v1 = streamingRelationV2.v1Relation();
                            if (table instanceof SupportsRead) {
                                Object object2;
                                SupportsRead supportsRead = (SupportsRead)table;
                                String dsStr = src.nonEmpty() ? new StringBuilder(2).append("[").append(src.get()).append("]").toString() : "";
                                boolean v2Disabled = this.disabledSources$1.contains((Object)src.getOrElse((Function0 & Serializable)() -> None$.MODULE$).getClass().getCanonicalName());
                                if (!v2Disabled && DataSourceV2Implicits$.MODULE$.TableHelper((Table)supportsRead).supports(TableCapability.MICRO_BATCH_READ)) {
                                    object2 = this.v2ToRelationMap$1.getOrElseUpdate((Object)streamingRelationV2, (Function0 & Serializable)() -> {
                                        String metadataPath = new StringBuilder(9).append($this.$outer.resolvedCheckpointRoot()).append("/sources/").append($this.nextSourceId$1.elem).toString();
                                        ++$this.nextSourceId$1.elem;
                                        $this.$outer.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(44).append("Reading table [").append(supportsRead).append("] from DataSourceV2 named '").append(srcName).append("' ").append(dsStr).toString());
                                        Scan scan = supportsRead.newScanBuilder(options).build();
                                        MicroBatchStream stream = scan.toMicroBatchStream(metadataPath);
                                        return new StreamingDataSourceV2Relation(output, scan, (SparkDataStream)stream, StreamingDataSourceV2Relation$.MODULE$.apply$default$4(), StreamingDataSourceV2Relation$.MODULE$.apply$default$5());
                                    });
                                } else {
                                    if (v1.isEmpty()) {
                                        throw QueryExecutionErrors$.MODULE$.microBatchUnsupportedByDataSourceError(srcName);
                                    }
                                    object2 = this.v2ToExecutionRelationMap$1.getOrElseUpdate((Object)streamingRelationV2, (Function0 & Serializable)() -> {
                                        String metadataPath = new StringBuilder(9).append($this.$outer.resolvedCheckpointRoot()).append("/sources/").append($this.nextSourceId$1.elem).toString();
                                        Source source = ((StreamingRelation)v1.get()).dataSource().createSource(metadataPath);
                                        ++$this.nextSourceId$1.elem;
                                        $this.$outer.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(43).append("Using Source [").append(source).append("] from DataSourceV2 named '").append(srcName).append("' ").append(dsStr).toString());
                                        return new StreamingExecutionRelation(source, (Seq<Attribute>)output, $this.$outer.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession());
                                    });
                                }
                                object = object2;
                                return (B1)object;
                            }
                        }
                        object = function1.apply(x1);
                        return (B1)object;
                    }

                    public final boolean isDefinedAt(LogicalPlan x1) {
                        StreamingRelationV2 streamingRelationV2;
                        Table table;
                        LogicalPlan logicalPlan2 = x1;
                        boolean bl = logicalPlan2 instanceof StreamingRelation ? true : logicalPlan2 instanceof StreamingRelationV2 && (table = (streamingRelationV2 = (StreamingRelationV2)logicalPlan2).table()) instanceof SupportsRead;
                        return bl;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.toExecutionRelationMap$1 = toExecutionRelationMap$1;
                        this.nextSourceId$1 = nextSourceId$1;
                        this.disabledSources$1 = disabledSources$1;
                        this.v2ToRelationMap$1 = v2ToRelationMap$1;
                        this.v2ToExecutionRelationMap$1 = v2ToExecutionRelationMap$1;
                    }

                    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                        return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$1(org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1 org.apache.spark.sql.execution.datasources.DataSource java.lang.String scala.collection.immutable.Seq ), $anonfun$applyOrElse$2(org.apache.spark.sql.execution.streaming.Source java.lang.String org.apache.spark.sql.execution.datasources.DataSource ), $anonfun$applyOrElse$3(), $anonfun$applyOrElse$4(org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1 org.apache.spark.sql.connector.catalog.SupportsRead java.lang.String java.lang.String org.apache.spark.sql.util.CaseInsensitiveStringMap scala.collection.immutable.Seq ), $anonfun$applyOrElse$5(org.apache.spark.sql.connector.catalog.SupportsRead java.lang.String java.lang.String ), $anonfun$applyOrElse$6(org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1 scala.Option java.lang.String java.lang.String scala.collection.immutable.Seq ), $anonfun$applyOrElse$7(org.apache.spark.sql.execution.streaming.Source java.lang.String java.lang.String )}, serializedLambda);
                    }
                });
                this.sources_$eq((Seq<SparkDataStream>)_logicalPlan.collect((PartialFunction)new Serializable(null){
                    private static final long serialVersionUID = 0L;

                    public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                        Object object;
                        A1 A1 = x2;
                        if (A1 instanceof StreamingExecutionRelation) {
                            StreamingExecutionRelation streamingExecutionRelation = (StreamingExecutionRelation)A1;
                            object = streamingExecutionRelation.source();
                        } else if (A1 instanceof StreamingDataSourceV2Relation) {
                            StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                            object = streamingDataSourceV2Relation.stream();
                        } else {
                            object = function1.apply(x2);
                        }
                        return (B1)object;
                    }

                    public final boolean isDefinedAt(LogicalPlan x2) {
                        LogicalPlan logicalPlan2 = x2;
                        boolean bl = logicalPlan2 instanceof StreamingExecutionRelation ? true : logicalPlan2 instanceof StreamingDataSourceV2Relation;
                        return bl;
                    }
                }));
                Product product = this.triggerExecutor();
                scala.collection.immutable.Map map = product instanceof SingleBatchExecutor ? ((IterableOnceOps)((IterableOps)this.sources().distinct()).map((Function1 & Serializable)x0$1 -> {
                    Tuple2 tuple2;
                    SparkDataStream sparkDataStream = x0$1;
                    if (sparkDataStream instanceof SupportsAdmissionControl) {
                        ReadLimit limit;
                        SupportsAdmissionControl supportsAdmissionControl = (SupportsAdmissionControl)sparkDataStream;
                        ReadLimit readLimit = limit = supportsAdmissionControl.getDefaultReadLimit();
                        ReadLimit readLimit2 = ReadLimit.allAvailable();
                        if (readLimit == null ? readLimit2 != null : !readLimit.equals(readLimit2)) {
                            this.logWarning((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(58).append("The read limit ").append(limit).append(" for ").append(supportsAdmissionControl).append(" is ignored when Trigger.Once is used.").toString());
                        }
                        tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)supportsAdmissionControl), (Object)ReadLimit.allAvailable());
                    } else {
                        tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)sparkDataStream), (Object)ReadLimit.allAvailable());
                    }
                    return tuple2;
                })).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()) : (product instanceof MultiBatchExecutor ? ((IterableOnceOps)((IterableOps)((IterableOps)this.sources().distinct()).map((Function1 & Serializable)x0$2 -> {
                    SupportsTriggerAvailableNow supportsTriggerAvailableNow;
                    SparkDataStream sparkDataStream = x0$2;
                    if (sparkDataStream instanceof SupportsTriggerAvailableNow) {
                        SupportsTriggerAvailableNow supportsTriggerAvailableNow2;
                        supportsTriggerAvailableNow = supportsTriggerAvailableNow2 = (SupportsTriggerAvailableNow)sparkDataStream;
                    } else if (sparkDataStream instanceof Source) {
                        Source source = (Source)sparkDataStream;
                        supportsTriggerAvailableNow = new AvailableNowSourceWrapper(source);
                    } else if (sparkDataStream instanceof MicroBatchStream) {
                        MicroBatchStream microBatchStream = (MicroBatchStream)sparkDataStream;
                        supportsTriggerAvailableNow = new AvailableNowMicroBatchStreamWrapper(microBatchStream);
                    } else {
                        throw new MatchError((Object)sparkDataStream);
                    }
                    return supportsTriggerAvailableNow;
                })).map((Function1 & Serializable)s -> {
                    s.prepareForTriggerAvailableNow();
                    return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(s), (Object)s.getDefaultReadLimit());
                })).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()) : ((IterableOnceOps)((IterableOps)this.sources().distinct()).map((Function1 & Serializable)x0$3 -> {
                    Tuple2 tuple2;
                    SparkDataStream sparkDataStream = x0$3;
                    if (sparkDataStream instanceof SupportsAdmissionControl) {
                        SupportsAdmissionControl supportsAdmissionControl = (SupportsAdmissionControl)sparkDataStream;
                        tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)supportsAdmissionControl), (Object)supportsAdmissionControl.getDefaultReadLimit());
                    } else {
                        tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)sparkDataStream), (Object)ReadLimit.allAvailable());
                    }
                    return tuple2;
                })).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()));
                this.uniqueSources_$eq((scala.collection.immutable.Map<SparkDataStream, ReadLimit>)map);
                Table table = this.sink();
                if (table instanceof SupportsWrite) {
                    SupportsWrite supportsWrite = (SupportsWrite)table;
                    Option relationOpt = this.plan.catalogAndIdent().map((Function1 & Serializable)x0$4 -> {
                        Tuple2 tuple2 = x0$4;
                        if (tuple2 == null) {
                            throw new MatchError((Object)tuple2);
                        }
                        TableCatalog catalog = (TableCatalog)tuple2._1();
                        Identifier ident = (Identifier)tuple2._2();
                        DataSourceV2Relation dataSourceV2Relation = DataSourceV2Relation$.MODULE$.create((Table)supportsWrite, (Option)new Some((Object)catalog), (Option)new Some((Object)ident));
                        return dataSourceV2Relation;
                    });
                    logicalPlan2 = new WriteToMicroBatchDataSource((Option<DataSourceV2Relation>)relationOpt, supportsWrite, _logicalPlan, this.id().toString(), this.extraOptions, this.outputMode(), WriteToMicroBatchDataSource$.MODULE$.apply$default$7());
                } else {
                    logicalPlan2 = _logicalPlan;
                }
                this.logicalPlan = logicalPlan2;
                this.bitmap$0 = true;
            }
        }
        this.extraOptions = null;
        return this.logicalPlan;
    }

    @Override
    public LogicalPlan logicalPlan() {
        return !this.bitmap$0 ? this.logicalPlan$lzycompute() : this.logicalPlan;
    }

    private boolean isCurrentBatchConstructed() {
        return this.isCurrentBatchConstructed;
    }

    private void isCurrentBatchConstructed_$eq(boolean x$1) {
        this.isCurrentBatchConstructed = x$1;
    }

    @Override
    public void stop() {
        this.state().set(TERMINATED$.MODULE$);
        if (this.queryExecutionThread().isAlive()) {
            super.sparkSession().sparkContext().cancelJobGroup(this.runId().toString());
            this.interruptAndAwaitExecutionThreadTermination();
            super.sparkSession().sparkContext().cancelJobGroup(this.runId().toString());
        }
        this.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(18).append("Query ").append(this.prettyIdString()).append(" was stopped").toString());
    }

    @Override
    public void startTrigger() {
        ProgressReporter.startTrigger$(this);
        StreamingQueryStatus qual$1 = this.currentStatus();
        boolean x$1 = true;
        String x$2 = qual$1.copy$default$1();
        boolean x$3 = qual$1.copy$default$2();
        this.currentStatus_$eq(qual$1.copy(x$2, x$3, true));
    }

    @Override
    public void runActivatedStream(SparkSession sparkSessionForStream) {
        boolean noDataBatchesEnabled = sparkSessionForStream.sessionState().conf().streamingNoDataMicroBatchesEnabled();
        ((TriggerExecutor)this.triggerExecutor()).execute((Function0<Object>)(JFunction0.mcZ.sp & Serializable)() -> {
            if (this.isActive()) {
                BooleanRef currentBatchHasNewData = BooleanRef.create((boolean)false);
                this.startTrigger();
                this.reportTimeTaken("triggerExecution", (JFunction0.mcV.sp & Serializable)() -> {
                    if (this.currentBatchId() < 0L) {
                        AcceptsLatestSeenOffsetHandler$.MODULE$.setLatestSeenOffsetOnSources((Option<OffsetSeq>)this.offsetLog().getLatest().map((Function1 & Serializable)x$2 -> (OffsetSeq)x$2._2()), this.sources());
                        this.populateStartOffsets(sparkSessionForStream);
                        this.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(20).append("Stream started from ").append(this.committedOffsets()).toString());
                    }
                    this.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession().sparkContext().setJobDescription(this.getBatchDescriptionString());
                    if (!this.isCurrentBatchConstructed()) {
                        this.isCurrentBatchConstructed_$eq(this.constructNextBatch(noDataBatchesEnabled));
                    }
                    this.recordTriggerOffsets(this.committedOffsets(), this.availableOffsets(), this.latestOffsets());
                    currentBatchHasNewData$1.elem = this.isNewDataAvailable();
                    StreamingQueryStatus qual$1 = this.currentStatus();
                    boolean x$1 = this.isNewDataAvailable();
                    String x$22 = qual$1.copy$default$1();
                    boolean x$3 = qual$1.copy$default$3();
                    this.currentStatus_$eq(qual$1.copy(x$22, x$1, x$3));
                    if (this.isCurrentBatchConstructed()) {
                        if (currentBatchHasNewData$1.elem) {
                            this.updateStatusMessage("Processing new data");
                        } else {
                            this.updateStatusMessage("No new data but cleaning up state");
                        }
                        this.runBatch(sparkSessionForStream);
                    } else {
                        this.updateStatusMessage("Waiting for data to arrive");
                    }
                });
                this.finishTrigger(currentBatchHasNewData.elem, this.isCurrentBatchConstructed());
                this.withProgressLocked((Function0)(JFunction0.mcV.sp & Serializable)() -> this.awaitProgressLockCondition().signalAll());
                if (this.isCurrentBatchConstructed()) {
                    this.currentBatchId_$eq(this.currentBatchId() + 1L);
                    this.isCurrentBatchConstructed_$eq(false);
                } else if (this.triggerExecutor() instanceof MultiBatchExecutor) {
                    this.logInfo((Function0<String>)(Function0 & Serializable)() -> "Finished processing all available data for the trigger, terminating this Trigger.AvailableNow query");
                    this.state().set(TERMINATED$.MODULE$);
                } else {
                    Thread.sleep(this.pollingDelayMs());
                }
            }
            this.updateStatusMessage("Waiting for next trigger");
            return this.isActive();
        });
    }

    private void populateStartOffsets(SparkSession sparkSessionToRunBatches) {
        Some some;
        Tuple2 tuple2;
        this.sinkCommitProgress_$eq((Option<StreamWriterCommitProgress>)None$.MODULE$);
        Option option = this.offsetLog().getLatest();
        if (option instanceof Some && (tuple2 = (Tuple2)(some = (Some)option).value()) != null) {
            Some some2;
            Tuple2 tuple22;
            long latestBatchId = tuple2._1$mcJ$sp();
            OffsetSeq nextOffsets = (OffsetSeq)tuple2._2();
            this.currentBatchId_$eq(latestBatchId);
            this.isCurrentBatchConstructed_$eq(true);
            this.availableOffsets_$eq(nextOffsets.toStreamProgress(this.sources()));
            if (latestBatchId != 0L) {
                OffsetSeq secondLatestOffsets = (OffsetSeq)this.offsetLog().get(latestBatchId - 1L).getOrElse((Function0 & Serializable)() -> {
                    this.logError((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(263).append(new StringBuilder(41).append("The offset log for batch ").append(latestBatchId - 1L).append(" doesn't exist, ").toString()).append(new StringBuilder(62).append("which is required to restart the query from the latest batch ").append(latestBatchId).append(" ").toString()).append("from the offset log. Please ensure there are two subsequent offset logs ").append("available for the latest batch via manually deleting the offset file(s). ").append("Please also ensure the latest batch for commit log is equal or one batch ").append("earlier than the latest batch for offset log.").toString());
                    throw new IllegalStateException(new StringBuilder(20).append("batch ").append(latestBatchId - 1L).append(" doesn't exist").toString());
                });
                this.committedOffsets_$eq(secondLatestOffsets.toStreamProgress(this.sources()));
            }
            nextOffsets.metadata().foreach((Function1 & Serializable)metadata -> {
                MicroBatchExecution.$anonfun$populateStartOffsets$3(this, sparkSessionToRunBatches, metadata);
                return BoxedUnit.UNIT;
            });
            Option option2 = this.commitLog().getLatest();
            if (option2 instanceof Some && (tuple22 = (Tuple2)(some2 = (Some)option2).value()) != null) {
                BoxedUnit boxedUnit;
                long latestCommittedBatchId = tuple22._1$mcJ$sp();
                CommitMetadata commitMetadata = (CommitMetadata)tuple22._2();
                if (latestBatchId == latestCommittedBatchId) {
                    this.availableOffsets().foreach((Function1 & Serializable)x0$1 -> {
                        void var2_10;
                        Tuple2 tuple2 = x0$1;
                        if (tuple2 != null) {
                            SparkDataStream source = (SparkDataStream)tuple2._1();
                            org.apache.spark.sql.connector.read.streaming.Offset end = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
                            if (source instanceof Source) {
                                Source source2 = (Source)source;
                                if (end instanceof Offset) {
                                    Offset offset = (Offset)end;
                                    Option start = this.committedOffsets().get(source2).map((Function1 & Serializable)x$3 -> (Offset)((Object)((Object)x$3)));
                                    Dataset<Row> dataset = source2.getBatch((Option<Offset>)start, offset);
                                    return var2_10;
                                }
                            }
                        }
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        return var2_10;
                    });
                    this.currentBatchId_$eq(latestCommittedBatchId + 1L);
                    this.isCurrentBatchConstructed_$eq(false);
                    this.committedOffsets_$eq((StreamProgress)this.committedOffsets().$plus$plus((IterableOnce)this.availableOffsets()));
                    this.watermarkTracker().setWatermark(package$.MODULE$.max(this.watermarkTracker().currentWatermark(), commitMetadata.nextBatchWatermarkMs()));
                    boxedUnit = BoxedUnit.UNIT;
                } else if (latestCommittedBatchId == latestBatchId - 1L) {
                    this.availableOffsets().foreach((Function1 & Serializable)x0$2 -> {
                        Tuple2 tuple2 = x0$2;
                        if (tuple2 == null) return BoxedUnit.UNIT;
                        SparkDataStream source = (SparkDataStream)tuple2._1();
                        org.apache.spark.sql.connector.read.streaming.Offset end = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
                        if (!(source instanceof Source)) return BoxedUnit.UNIT;
                        Source source2 = (Source)source;
                        if (!(end instanceof Offset)) return BoxedUnit.UNIT;
                        Offset offset = (Offset)end;
                        Option start = this.committedOffsets().get(source2).map((Function1 & Serializable)x$4 -> (Offset)((Object)((Object)x$4)));
                        return BoxesRunTime.unboxToBoolean((Object)start.map((Function1 & Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$populateStartOffsets$8(offset, x$5))).getOrElse((Function0)(JFunction0.mcZ.sp & Serializable)() -> true)) ? source2.getBatch((Option<Offset>)start, offset) : BoxedUnit.UNIT;
                    });
                    boxedUnit = BoxedUnit.UNIT;
                } else if (latestCommittedBatchId < latestBatchId - 1L) {
                    this.logWarning((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(0).append("Batch completion log latest batch id is ").append(new StringBuilder(24).append(latestCommittedBatchId).append(", which is not trailing ").toString()).append(new StringBuilder(15).append("batchid ").append(latestBatchId).append(" by one").toString()).toString());
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit2 = boxedUnit;
            } else if (None$.MODULE$.equals(option2)) {
                this.logInfo((Function0<String>)(Function0 & Serializable)() -> "no commit log present");
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                throw new MatchError(option2);
            }
            this.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(0).append(new StringBuilder(42).append("Resuming at batch ").append(this.currentBatchId()).append(" with committed offsets ").toString()).append(new StringBuilder(23).append(this.committedOffsets()).append(" and available offsets ").append(this.availableOffsets()).toString()).toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (None$.MODULE$.equals(option)) {
            this.logInfo((Function0<String>)(Function0 & Serializable)() -> "Starting new streaming query.");
            this.currentBatchId_$eq(0L);
            this.watermarkTracker_$eq(WatermarkTracker$.MODULE$.apply(sparkSessionToRunBatches.conf()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError(option);
        }
    }

    private boolean isNewDataAvailable() {
        return this.availableOffsets().exists((Function1<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>, Object>)(Function1 & Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$isNewDataAvailable$1(this, x0$1)));
    }

    private org.apache.spark.sql.connector.read.streaming.Offset getStartOffset(SparkDataStream dataStream) {
        org.apache.spark.sql.connector.read.streaming.Offset offset2;
        Option<org.apache.spark.sql.connector.read.streaming.Offset> startOffsetOpt = this.availableOffsets().get(dataStream);
        SparkDataStream sparkDataStream = dataStream;
        if (sparkDataStream instanceof Source) {
            offset2 = (org.apache.spark.sql.connector.read.streaming.Offset)startOffsetOpt.orNull((.less.colon.less)$less$colon$less$.MODULE$.refl());
        } else if (sparkDataStream instanceof MicroBatchStream) {
            MicroBatchStream microBatchStream = (MicroBatchStream)sparkDataStream;
            offset2 = (org.apache.spark.sql.connector.read.streaming.Offset)startOffsetOpt.map((Function1 & Serializable)offset -> microBatchStream.deserializeOffset(offset.json())).getOrElse((Function0 & Serializable)() -> microBatchStream.initialOffset());
        } else {
            throw new MatchError((Object)sparkDataStream);
        }
        return offset2;
    }

    private boolean constructNextBatch(boolean noDataBatchesEnabled) {
        boolean bl;
        Object object = new Object();
        try {
            bl = BoxesRunTime.unboxToBoolean(this.withProgressLocked((Function0)(JFunction0.mcZ.sp & Serializable)() -> {
                if (this.isCurrentBatchConstructed()) {
                    throw new NonLocalReturnControl.mcZ.sp(object, true);
                }
                Tuple2 tuple2 = ((IterableOps)this.uniqueSources().toSeq().map((Function1 & Serializable)x0$1 -> {
                    SparkDataStream s;
                    SparkDataStream s2;
                    Tuple2 tuple2 = x0$1;
                    if (tuple2 != null) {
                        SparkDataStream s3 = (SparkDataStream)tuple2._1();
                        ReadLimit limit = (ReadLimit)tuple2._2();
                        if (s3 instanceof AvailableNowDataStreamWrapper) {
                            AvailableNowDataStreamWrapper availableNowDataStreamWrapper = (AvailableNowDataStreamWrapper)s3;
                            this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(availableNowDataStreamWrapper).toString());
                            SparkDataStream originalSource = availableNowDataStreamWrapper.delegate();
                            return (Tuple2)this.reportTimeTaken("latestOffset", (Function0 & Serializable)() -> {
                                org.apache.spark.sql.connector.read.streaming.Offset next = availableNowDataStreamWrapper.latestOffset(this.getStartOffset(originalSource), limit);
                                org.apache.spark.sql.connector.read.streaming.Offset latest = availableNowDataStreamWrapper.reportLatestOffset();
                                return new Tuple2((Object)new Tuple2((Object)originalSource, (Object)Option$.MODULE$.apply((Object)next)), (Object)new Tuple2((Object)originalSource, (Object)Option$.MODULE$.apply((Object)latest)));
                            });
                        }
                    }
                    if (tuple2 != null) {
                        SparkDataStream s4 = (SparkDataStream)tuple2._1();
                        ReadLimit limit = (ReadLimit)tuple2._2();
                        if (s4 instanceof SupportsAdmissionControl) {
                            SupportsAdmissionControl supportsAdmissionControl = (SupportsAdmissionControl)s4;
                            this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(supportsAdmissionControl).toString());
                            return (Tuple2)this.reportTimeTaken("latestOffset", (Function0 & Serializable)() -> {
                                org.apache.spark.sql.connector.read.streaming.Offset next = supportsAdmissionControl.latestOffset(this.getStartOffset((SparkDataStream)supportsAdmissionControl), limit);
                                org.apache.spark.sql.connector.read.streaming.Offset latest = supportsAdmissionControl.reportLatestOffset();
                                return new Tuple2((Object)new Tuple2((Object)supportsAdmissionControl, (Object)Option$.MODULE$.apply((Object)next)), (Object)new Tuple2((Object)supportsAdmissionControl, (Object)Option$.MODULE$.apply((Object)latest)));
                            });
                        }
                    }
                    if (tuple2 != null && (s2 = (SparkDataStream)tuple2._1()) instanceof Source) {
                        Source source = (Source)s2;
                        this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(source).toString());
                        return (Tuple2)this.reportTimeTaken("getOffset", (Function0 & Serializable)() -> {
                            Option<Offset> offset = source.getOffset();
                            return new Tuple2((Object)new Tuple2((Object)source, offset), (Object)new Tuple2((Object)source, offset));
                        });
                    }
                    if (tuple2 != null && (s = (SparkDataStream)tuple2._1()) instanceof MicroBatchStream) {
                        MicroBatchStream microBatchStream = (MicroBatchStream)s;
                        this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(microBatchStream).toString());
                        return (Tuple2)this.reportTimeTaken("latestOffset", (Function0 & Serializable)() -> {
                            org.apache.spark.sql.connector.read.streaming.Offset latest = microBatchStream.latestOffset();
                            return new Tuple2((Object)new Tuple2((Object)microBatchStream, (Object)Option$.MODULE$.apply((Object)latest)), (Object)new Tuple2((Object)microBatchStream, (Object)Option$.MODULE$.apply((Object)latest)));
                        });
                    }
                    if (tuple2 == null) throw new MatchError((Object)tuple2);
                    SparkDataStream s5 = (SparkDataStream)tuple2._1();
                    throw new IllegalStateException(new StringBuilder(19).append("Unexpected source: ").append(s5).toString());
                })).unzip(Predef$.MODULE$.$conforms());
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                Seq nextOffsets = (Seq)tuple2._1();
                Seq recentOffsets = (Seq)tuple2._2();
                Tuple2 tuple22 = new Tuple2((Object)nextOffsets, (Object)recentOffsets);
                Tuple2 tuple23 = tuple22;
                Seq nextOffsets2 = (Seq)tuple23._1();
                Seq recentOffsets2 = (Seq)tuple23._2();
                this.availableOffsets_$eq((StreamProgress)this.availableOffsets().$plus$plus((IterableOnce)((IterableOnceOps)((IterableOps)nextOffsets2.filter((Function1 & Serializable)x0$2 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$constructNextBatch$7(x0$2)))).map((Function1 & Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(p._1()), ((Option)p._2()).get()))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())));
                this.latestOffsets_$eq((StreamProgress)this.latestOffsets().$plus$plus((IterableOnce)((IterableOnceOps)((IterableOps)recentOffsets2.filter((Function1 & Serializable)x0$3 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$constructNextBatch$9(x0$3)))).map((Function1 & Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(p._1()), ((Option)p._2()).get()))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())));
                OffsetSeqMetadata qual$1 = this.offsetSeqMetadata();
                long x$1 = this.watermarkTracker().currentWatermark();
                long x$2 = this.super$triggerClock().getTimeMillis();
                scala.collection.immutable.Map<String, String> x$3 = qual$1.copy$default$3();
                this.offsetSeqMetadata_$eq(qual$1.copy(x$1, x$2, x$3));
                boolean lastExecutionRequiresAnotherBatch = noDataBatchesEnabled && Option$.MODULE$.apply((Object)this.lastExecution()).exists((Function1 & Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)x$7.shouldRunAnotherBatch(this.offsetSeqMetadata())));
                boolean shouldConstructNextBatch = this.isNewDataAvailable() || lastExecutionRequiresAnotherBatch;
                this.logTrace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(0).append(new StringBuilder(25).append("noDataBatchesEnabled = ").append(noDataBatchesEnabled).append(", ").toString()).append(new StringBuilder(38).append("lastExecutionRequiresAnotherBatch = ").append(lastExecutionRequiresAnotherBatch).append(", ").toString()).append(new StringBuilder(23).append("isNewDataAvailable = ").append(this.isNewDataAvailable()).append(", ").toString()).append(new StringBuilder(27).append("shouldConstructNextBatch = ").append(shouldConstructNextBatch).toString()).toString());
                if (shouldConstructNextBatch) {
                    this.updateStatusMessage("Writing offsets to log");
                    this.reportTimeTaken("walCommit", (JFunction0.mcV.sp & Serializable)() -> {
                        block3: {
                            Predef$.MODULE$.assert(this.offsetLog().add(this.currentBatchId(), this.availableOffsets().toOffsetSeq(this.sources(), this.offsetSeqMetadata())), (Function0 & Serializable)() -> new StringBuilder(67).append("Concurrent update to the log. Multiple streaming jobs detected for ").append(this.currentBatchId()).toString());
                            this.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(0).append(new StringBuilder(30).append("Committed offsets for batch ").append(this.currentBatchId()).append(". ").toString()).append(new StringBuilder(9).append("Metadata ").append(this.offsetSeqMetadata().toString()).toString()).toString());
                            if (this.currentBatchId() != 0L) {
                                Option<OffsetSeq> prevBatchOff = this.offsetLog().get(this.currentBatchId() - 1L);
                                if (prevBatchOff.isDefined()) {
                                    ((OffsetSeq)prevBatchOff.get()).toStreamProgress(this.sources()).foreach((Function1 & Serializable)x0$4 -> {
                                        MicroBatchExecution.$anonfun$constructNextBatch$16(x0$4);
                                        return BoxedUnit.UNIT;
                                    });
                                } else {
                                    throw new IllegalStateException(new StringBuilder(20).append("batch ").append(this.currentBatchId() - 1L).append(" doesn't exist").toString());
                                }
                            }
                            if ((long)this.minLogEntriesToMaintain() >= this.currentBatchId()) break block3;
                            this.purge(this.currentBatchId() - (long)this.minLogEntriesToMaintain());
                        }
                    });
                    this.noNewData_$eq(false);
                } else {
                    this.noNewData_$eq(true);
                    this.awaitProgressLockCondition().signalAll();
                }
                return shouldConstructNextBatch;
            }));
        }
        catch (NonLocalReturnControl ex) {
            if (ex.key() == object) {
                bl = ex.value$mcZ$sp();
            }
            throw ex;
        }
        return bl;
    }

    private void runBatch(SparkSession sparkSessionToRunBatch) {
        LogicalPlan logicalPlan2;
        this.logDebug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(14).append("Running batch ").append(this.currentBatchId()).toString());
        this.newData_$eq((scala.collection.immutable.Map<SparkDataStream, LogicalPlan>)((scala.collection.immutable.Map)this.reportTimeTaken("getBatch", (Function0 & Serializable)() -> (scala.collection.immutable.Map)this.availableOffsets().flatMap((Function1 & Serializable)x0$1 -> {
            org.apache.spark.sql.connector.read.streaming.Offset offset;
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                SparkDataStream source = (SparkDataStream)tuple2._1();
                org.apache.spark.sql.connector.read.streaming.Offset available = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
                if (source instanceof Source) {
                    Source source2 = (Source)source;
                    if (available instanceof Offset) {
                        Offset offset2 = (Offset)available;
                        if (BoxesRunTime.unboxToBoolean((Object)this.committedOffsets().get(source2).map((Function1 & Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$runBatch$4(offset2, x$8))).getOrElse((Function0)(JFunction0.mcZ.sp & Serializable)() -> true))) {
                            Option current = this.committedOffsets().get(source2).map((Function1 & Serializable)x$9 -> (Offset)((Object)((Object)((Object)x$9))));
                            Dataset<Row> batch = source2.getBatch((Option<Offset>)current, offset2);
                            Predef$.MODULE$.assert(batch.isStreaming(), (Function0 & Serializable)() -> new StringBuilder(0).append(new StringBuilder(67).append("DataFrame returned by getBatch from ").append(source2).append(" did not have isStreaming=true\n").toString()).append(String.valueOf(batch.queryExecution().logical())).toString());
                            this.logDebug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(27).append("Retrieving data from ").append(source2).append(": ").append(current).append(" -> ").append((Object)offset2).toString());
                            return new Some((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)source2), (Object)batch.logicalPlan()));
                        }
                    }
                }
            }
            if (tuple2 == null) return None$.MODULE$;
            SparkDataStream stream = (SparkDataStream)tuple2._1();
            org.apache.spark.sql.connector.read.streaming.Offset available = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
            if (!(stream instanceof MicroBatchStream)) return None$.MODULE$;
            MicroBatchStream microBatchStream = (MicroBatchStream)stream;
            if (!BoxesRunTime.unboxToBoolean((Object)this.committedOffsets().get((SparkDataStream)microBatchStream).map((Function1 & Serializable)x$10 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$runBatch$9(available, x$10))).getOrElse((Function0)(JFunction0.mcZ.sp & Serializable)() -> true))) return None$.MODULE$;
            Option current = this.committedOffsets().get((SparkDataStream)microBatchStream).map((Function1 & Serializable)off -> microBatchStream.deserializeOffset(off.json()));
            org.apache.spark.sql.connector.read.streaming.Offset offset3 = available;
            if (offset3 instanceof SerializedOffset) {
                SerializedOffset serializedOffset = (SerializedOffset)offset3;
                offset = microBatchStream.deserializeOffset(serializedOffset.json());
            } else {
                org.apache.spark.sql.connector.read.streaming.Offset offset4;
                if (offset3 == null) throw new MatchError((Object)offset3);
                offset = offset4 = offset3;
            }
            org.apache.spark.sql.connector.read.streaming.Offset endOffset = offset;
            org.apache.spark.sql.connector.read.streaming.Offset startOffset = (org.apache.spark.sql.connector.read.streaming.Offset)current.getOrElse((Function0 & Serializable)() -> microBatchStream.initialOffset());
            this.logDebug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(27).append("Retrieving data from ").append(microBatchStream).append(": ").append(current).append(" -> ").append(endOffset).toString());
            return new Some((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)microBatchStream), (Object)new OffsetHolder(startOffset, endOffset)));
        }))));
        LogicalPlan newBatchesPlan = (LogicalPlan)this.logicalPlan().transform((PartialFunction)new Serializable(this){
            private static final long serialVersionUID = 0L;
            private final /* synthetic */ MicroBatchExecution $outer;

            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x1;
                if (A1 instanceof StreamingExecutionRelation) {
                    StreamingExecutionRelation streamingExecutionRelation = (StreamingExecutionRelation)A1;
                    SparkDataStream source = streamingExecutionRelation.source();
                    Seq<Attribute> output = streamingExecutionRelation.output();
                    object = this.$outer.newData().get((Object)source).map(arg_0 -> $anonfun$2.$anonfun$applyOrElse$8(output, arg_0)).getOrElse((Function0 & Serializable)() -> {
                        Seq x$7 = output;
                        boolean x$8 = true;
                        Seq x$9 = LocalRelation$.MODULE$.apply$default$2();
                        return new LocalRelation(x$7, x$9, true);
                    });
                } else if (A1 instanceof StreamingDataSourceV2Relation) {
                    StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                    object = this.$outer.newData().get((Object)streamingDataSourceV2Relation.stream()).map((Function1 & Serializable)x0$3 -> {
                        LogicalPlan logicalPlan2 = x0$3;
                        if (!(logicalPlan2 instanceof OffsetHolder)) {
                            throw new MatchError((Object)logicalPlan2);
                        }
                        OffsetHolder offsetHolder = (OffsetHolder)logicalPlan2;
                        org.apache.spark.sql.connector.read.streaming.Offset start = offsetHolder.start();
                        org.apache.spark.sql.connector.read.streaming.Offset end = offsetHolder.end();
                        Some x$10 = new Some((Object)start);
                        Some x$11 = new Some((Object)end);
                        Seq x$12 = streamingDataSourceV2Relation.copy$default$1();
                        Scan x$13 = streamingDataSourceV2Relation.copy$default$2();
                        SparkDataStream x$14 = streamingDataSourceV2Relation.copy$default$3();
                        StreamingDataSourceV2Relation streamingDataSourceV2Relation = streamingDataSourceV2Relation.copy(x$12, x$13, x$14, (Option)x$10, (Option)x$11);
                        return streamingDataSourceV2Relation;
                    }).getOrElse((Function0 & Serializable)() -> {
                        Seq x$15 = streamingDataSourceV2Relation.output();
                        boolean x$16 = true;
                        Seq x$17 = LocalRelation$.MODULE$.apply$default$2();
                        return new LocalRelation(x$15, x$17, true);
                    });
                } else {
                    object = function1.apply(x1);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(LogicalPlan x1) {
                LogicalPlan logicalPlan2 = x1;
                boolean bl = logicalPlan2 instanceof StreamingExecutionRelation ? true : logicalPlan2 instanceof StreamingDataSourceV2Relation;
                return bl;
            }

            public static final /* synthetic */ boolean $anonfun$applyOrElse$9(Attribute x0$1) {
                AttributeReference attributeReference;
                Option option;
                Attribute attribute = x0$1;
                boolean bl = attribute instanceof AttributeReference && !(option = FileSourceMetadataAttribute$.MODULE$.unapply(attributeReference = (AttributeReference)attribute)).isEmpty();
                return bl;
            }

            public static final /* synthetic */ String $anonfun$applyOrElse$10(Seq output$3, int maxFields$1, LogicalPlan finalDataPlan$1) {
                return new StringBuilder(0).append(new StringBuilder(19).append("Invalid batch: ").append(org.apache.spark.sql.catalyst.util.package$.MODULE$.truncatedString(output$3, ",", maxFields$1)).append(" != ").toString()).append(String.valueOf(org.apache.spark.sql.catalyst.util.package$.MODULE$.truncatedString(finalDataPlan$1.output(), ",", maxFields$1))).toString();
            }

            public static final /* synthetic */ Alias $anonfun$applyOrElse$11(Tuple2 x0$2) {
                Attribute from;
                Tuple2 tuple2 = x0$2;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                Attribute to = (Attribute)tuple2._1();
                Attribute x$1 = from = (Attribute)tuple2._2();
                String x$2 = to.name();
                ExprId x$3 = to.exprId();
                Some x$4 = new Some((Object)from.metadata());
                Seq x$5 = Alias$.MODULE$.apply$default$4((Expression)x$1, x$2);
                Seq x$6 = Alias$.MODULE$.apply$default$6((Expression)x$1, x$2);
                Alias alias = new Alias((Expression)x$1, x$2, x$3, x$5, (Option)x$4, x$6);
                return alias;
            }

            /*
             * Unable to fully structure code
             */
            public static final /* synthetic */ Project $anonfun$applyOrElse$8(Seq output$3, LogicalPlan dataPlan) {
                hasFileMetadata = output$3.exists((Function1)(Function1 & Serializable)LambdaMetafactory.altMetafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, $anonfun$applyOrElse$9$adapted(org.apache.spark.sql.catalyst.expressions.Attribute ), (Lorg/apache/spark/sql/catalyst/expressions/Attribute;)Ljava/lang/Object;)());
                var5_3 = dataPlan;
                if (!(var5_3 instanceof LogicalRelation)) ** GOTO lbl-1000
                var6_4 = (LogicalRelation)var5_3;
                if (hasFileMetadata) {
                    var2_5 = var6_4.withMetadataColumns();
                } else lbl-1000:
                // 2 sources

                {
                    var2_5 = dataPlan;
                }
                finalDataPlan = var2_5;
                maxFields = SQLConf$.MODULE$.get().maxToStringFields();
                Predef$.MODULE$.assert(output$3.size() == finalDataPlan.output().size(), (Function0)(Function0 & Serializable)LambdaMetafactory.altMetafactory(null, null, null, ()Ljava/lang/Object;, $anonfun$applyOrElse$10(scala.collection.immutable.Seq int org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ), ()Ljava/lang/String;)((Seq)output$3, (int)maxFields, (LogicalPlan)finalDataPlan));
                aliases = (Seq)((IterableOps)output$3.zip((IterableOnce)finalDataPlan.output())).map((Function1)(Function1 & Serializable)LambdaMetafactory.altMetafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, $anonfun$applyOrElse$11(scala.Tuple2 ), (Lscala/Tuple2;)Lorg/apache/spark/sql/catalyst/expressions/Alias;)());
                return new Project(aliases, finalDataPlan);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }

            public static final /* synthetic */ Object $anonfun$applyOrElse$9$adapted(Attribute x0$1) {
                return BoxesRunTime.boxToBoolean((boolean)$anonfun$2.$anonfun$applyOrElse$9(x0$1));
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$10(scala.collection.immutable.Seq int org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ), $anonfun$applyOrElse$11(scala.Tuple2 ), $anonfun$applyOrElse$12(scala.collection.immutable.Seq ), $anonfun$applyOrElse$13(org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ), $anonfun$applyOrElse$14(org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation ), $anonfun$applyOrElse$8(scala.collection.immutable.Seq org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ), $anonfun$applyOrElse$9$adapted(org.apache.spark.sql.catalyst.expressions.Attribute )}, serializedLambda);
            }
        });
        LogicalPlan newAttributePlan = newBatchesPlan.transformAllExpressionsWithPruning((Function1 & Serializable)x$11 -> BoxesRunTime.boxToBoolean((boolean)x$11.containsPattern(TreePattern$.MODULE$.CURRENT_LIKE())), newBatchesPlan.transformAllExpressionsWithPruning$default$2(), (PartialFunction)new Serializable(this){
            private static final long serialVersionUID = 0L;
            private final /* synthetic */ MicroBatchExecution $outer;

            public final <A1 extends Expression, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x2;
                if (A1 instanceof CurrentTimestamp) {
                    CurrentTimestamp currentTimestamp = (CurrentTimestamp)A1;
                    object = new CurrentBatchTimestamp(this.$outer.offsetSeqMetadata().batchTimestampMs(), currentTimestamp.dataType(), (Option)new Some((Object)"Dummy TimeZoneId"));
                } else if (A1 instanceof LocalTimestamp) {
                    LocalTimestamp localTimestamp = (LocalTimestamp)A1;
                    object = new CurrentBatchTimestamp(this.$outer.offsetSeqMetadata().batchTimestampMs(), localTimestamp.dataType(), localTimestamp.timeZoneId());
                } else if (A1 instanceof CurrentDate) {
                    CurrentDate currentDate = (CurrentDate)A1;
                    object = new CurrentBatchTimestamp(this.$outer.offsetSeqMetadata().batchTimestampMs(), currentDate.dataType(), currentDate.timeZoneId());
                } else {
                    object = function1.apply(x2);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(Expression x2) {
                Expression expression = x2;
                boolean bl = expression instanceof CurrentTimestamp ? true : (expression instanceof LocalTimestamp ? true : expression instanceof CurrentDate);
                return bl;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        Table table = this.sink();
        if (table instanceof Sink) {
            logicalPlan2 = newAttributePlan;
        } else if (table instanceof SupportsWrite) {
            logicalPlan2 = ((WriteToMicroBatchDataSource)newAttributePlan).withNewBatchId(this.currentBatchId());
        } else {
            throw new IllegalArgumentException(new StringBuilder(22).append("unknown sink type for ").append(this.sink()).toString());
        }
        LogicalPlan triggerLogicalPlan = logicalPlan2;
        sparkSessionToRunBatch.sparkContext().setLocalProperty(MicroBatchExecution$.MODULE$.BATCH_ID_KEY(), Long.toString(this.currentBatchId()));
        sparkSessionToRunBatch.sparkContext().setLocalProperty(StreamExecution$.MODULE$.IS_CONTINUOUS_PROCESSING(), Boolean.toString(false));
        this.reportTimeTaken("queryPlanning", (Function0 & Serializable)() -> {
            this.lastExecution_$eq(new IncrementalExecution(sparkSessionToRunBatch, triggerLogicalPlan, this.outputMode(), this.checkpointFile("state"), this.id(), this.runId(), this.currentBatchId(), this.offsetSeqMetadata()));
            return this.lastExecution().executedPlan();
        });
        Dataset nextBatch = new Dataset(this.lastExecution(), RowEncoder$.MODULE$.apply(this.lastExecution().analyzed().schema()));
        Option batchSinkProgress = (Option)this.reportTimeTaken("addBatch", (Function0 & Serializable)() -> (Option)SQLExecution$.MODULE$.withNewExecutionId(this.lastExecution(), SQLExecution$.MODULE$.withNewExecutionId$default$2(), (Function0 & Serializable)() -> {
            Option<StreamWriterCommitProgress> option;
            Object object;
            Table table = this.sink();
            if (table instanceof Sink) {
                Sink sink = (Sink)table;
                sink.addBatch(this.currentBatchId(), nextBatch);
                object = BoxedUnit.UNIT;
            } else if (table instanceof SupportsWrite) {
                object = nextBatch.collect();
            } else {
                throw new MatchError((Object)table);
            }
            SparkPlan sparkPlan = this.lastExecution().executedPlan();
            if (sparkPlan instanceof WriteToDataSourceV2Exec) {
                WriteToDataSourceV2Exec writeToDataSourceV2Exec = (WriteToDataSourceV2Exec)sparkPlan;
                option = writeToDataSourceV2Exec.commitProgress();
            } else {
                option = None$.MODULE$;
            }
            return option;
        }));
        this.withProgressLocked((Function0)(JFunction0.mcV.sp & Serializable)() -> {
            this.sinkCommitProgress_$eq((Option<StreamWriterCommitProgress>)batchSinkProgress);
            this.watermarkTracker().updateWatermark(this.lastExecution().executedPlan());
            Predef$.MODULE$.assert(this.commitLog().add(this.currentBatchId(), new CommitMetadata(this.watermarkTracker().currentWatermark())), (Function0 & Serializable)() -> new StringBuilder(74).append("Concurrent update to the commit log. Multiple streaming jobs detected for ").append(String.valueOf(BoxesRunTime.boxToLong((long)this.currentBatchId()))).toString());
            this.committedOffsets_$eq((StreamProgress)this.committedOffsets().$plus$plus((IterableOnce)this.availableOffsets()));
        });
        this.logDebug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(16).append("Completed batch ").append(this.currentBatchId()).toString());
    }

    public <T> T withProgressLocked(Function0<T> f) {
        Object object;
        this.awaitProgressLock().lock();
        try {
            object = f.apply();
        }
        finally {
            this.awaitProgressLock().unlock();
        }
        return (T)object;
    }

    public static final /* synthetic */ void $anonfun$populateStartOffsets$3(MicroBatchExecution $this, SparkSession sparkSessionToRunBatches$1, OffsetSeqMetadata metadata) {
        OffsetSeqMetadata$.MODULE$.setSessionConf(metadata, sparkSessionToRunBatches$1.conf());
        $this.offsetSeqMetadata_$eq(OffsetSeqMetadata$.MODULE$.apply(metadata.batchWatermarkMs(), metadata.batchTimestampMs(), sparkSessionToRunBatches$1.conf()));
        $this.watermarkTracker_$eq(WatermarkTracker$.MODULE$.apply(sparkSessionToRunBatches$1.conf()));
        $this.watermarkTracker().setWatermark(metadata.batchWatermarkMs());
    }

    public static final /* synthetic */ boolean $anonfun$populateStartOffsets$8(Offset x3$1, Offset x$5) {
        Offset offset = x$5;
        Offset offset2 = x3$1;
        return !(offset != null ? !((Object)((Object)offset)).equals((Object)offset2) : offset2 != null);
    }

    public static final /* synthetic */ boolean $anonfun$isNewDataAvailable$2(org.apache.spark.sql.connector.read.streaming.Offset available$1, org.apache.spark.sql.connector.read.streaming.Offset committed) {
        org.apache.spark.sql.connector.read.streaming.Offset offset = committed;
        org.apache.spark.sql.connector.read.streaming.Offset offset2 = available$1;
        return offset == null ? offset2 != null : !offset.equals(offset2);
    }

    public static final /* synthetic */ boolean $anonfun$isNewDataAvailable$1(MicroBatchExecution $this, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        SparkDataStream source = (SparkDataStream)tuple2._1();
        org.apache.spark.sql.connector.read.streaming.Offset available = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
        boolean bl = BoxesRunTime.unboxToBoolean((Object)$this.committedOffsets().get(source).map((Function1 & Serializable)committed -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$isNewDataAvailable$2(available, committed))).getOrElse((Function0)(JFunction0.mcZ.sp & Serializable)() -> true));
        return bl;
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$7(Tuple2 x0$2) {
        Tuple2 tuple2 = x0$2;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        Option o = (Option)tuple2._2();
        boolean bl = o.nonEmpty();
        return bl;
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$9(Tuple2 x0$3) {
        Tuple2 tuple2 = x0$3;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        Option o = (Option)tuple2._2();
        boolean bl = o.nonEmpty();
        return bl;
    }

    /*
     * Enabled aggressive block sorting
     */
    public static final /* synthetic */ void $anonfun$constructNextBatch$16(Tuple2 x0$4) {
        Tuple2 tuple2 = x0$4;
        if (tuple2 != null) {
            SparkDataStream src = (SparkDataStream)tuple2._1();
            org.apache.spark.sql.connector.read.streaming.Offset off = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
            if (src instanceof Source) {
                Source source = (Source)src;
                if (off instanceof Offset) {
                    Offset offset = (Offset)off;
                    source.commit(offset);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (tuple2 != null) {
            SparkDataStream stream = (SparkDataStream)tuple2._1();
            org.apache.spark.sql.connector.read.streaming.Offset off = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
            if (stream instanceof MicroBatchStream) {
                MicroBatchStream microBatchStream = (MicroBatchStream)stream;
                microBatchStream.commit(microBatchStream.deserializeOffset(off.json()));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 == null) throw new MatchError((Object)tuple2);
        SparkDataStream src = (SparkDataStream)tuple2._1();
        throw new IllegalArgumentException(new StringBuilder(47).append("Unknown source is found at constructNextBatch: ").append(src).toString());
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$4(Offset x3$4, org.apache.spark.sql.connector.read.streaming.Offset x$8) {
        org.apache.spark.sql.connector.read.streaming.Offset offset = x$8;
        Offset offset2 = x3$4;
        return offset == null ? offset2 != null : !offset.equals((Object)offset2);
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$9(org.apache.spark.sql.connector.read.streaming.Offset available$2, org.apache.spark.sql.connector.read.streaming.Offset x$10) {
        org.apache.spark.sql.connector.read.streaming.Offset offset = x$10;
        org.apache.spark.sql.connector.read.streaming.Offset offset2 = available$2;
        return offset == null ? offset2 != null : !offset.equals(offset2);
    }

    public MicroBatchExecution(SparkSession sparkSession, Trigger trigger, Clock triggerClock, scala.collection.immutable.Map<String, String> extraOptions, WriteToStream plan) {
        Product product;
        this.extraOptions = extraOptions;
        this.plan = plan;
        super(sparkSession, plan.name(), plan.resolvedCheckpointLocation(), plan.inputQuery(), plan.sink(), trigger, triggerClock, plan.outputMode(), plan.deleteCheckpointOnStop());
        this.sources = (Seq)scala.package$.MODULE$.Seq().empty();
        Trigger trigger2 = super.trigger();
        if (trigger2 instanceof ProcessingTimeTrigger) {
            ProcessingTimeTrigger processingTimeTrigger = (ProcessingTimeTrigger)trigger2;
            product = new ProcessingTimeExecutor(processingTimeTrigger, super.triggerClock());
        } else if (OneTimeTrigger$.MODULE$.equals(trigger2)) {
            product = new SingleBatchExecutor();
        } else if (AvailableNowTrigger$.MODULE$.equals(trigger2)) {
            product = new MultiBatchExecutor();
        } else {
            throw new IllegalStateException(new StringBuilder(25).append("Unknown type of trigger: ").append(super.trigger()).toString());
        }
        this.triggerExecutor = product;
        this.isCurrentBatchConstructed = false;
    }
}

