package org.apache.spark.streaming.scheduler;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Logging;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.util.WriteAheadLogManager;
import org.apache.spark.streaming.util.WriteAheadLogManager$;
import org.apache.spark.util.Clock;
import org.slf4j.Logger;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Queue;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ReceivedBlockTracker.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ef!B\u0001\u0003\u0001\u0011a!\u0001\u0006*fG\u0016Lg/\u001a3CY>\u001c7\u000e\u0016:bG.,'O\u0003\u0002\u0004\t\u0005I1o\u00195fIVdWM\u001d\u0006\u0003\u000b\u0019\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u001dA\u0011!B:qCJ\\'BA\u0005\u000b\u0003\u0019\t\u0007/Y2iK*\t1\"A\u0002pe\u001e\u001c2\u0001A\u0007\u0014!\tq\u0011#D\u0001\u0010\u0015\u0005\u0001\u0012!B:dC2\f\u0017B\u0001\n\u0010\u0005\u0019\te.\u001f*fMB\u0011A#F\u0007\u0002\r%\u0011aC\u0002\u0002\b\u0019><w-\u001b8h\u0011!A\u0002A!A!\u0002\u0013Q\u0012\u0001B2p]\u001a\u001c\u0001\u0001\u0005\u0002\u00157%\u0011AD\u0002\u0002\n'B\f'o[\"p]\u001aD\u0001B\b\u0001\u0003\u0002\u0003\u0006IaH\u0001\u000bQ\u0006$wn\u001c9D_:4\u0007C\u0001\u0011%\u001b\u0005\t#B\u0001\r#\u0015\t\u0019\u0003\"\u0001\u0004iC\u0012|w\u000e]\u0005\u0003K\u0005\u0012QbQ8oM&<WO]1uS>t\u0007\u0002C\u0014\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0015\u0002\u0013M$(/Z1n\u0013\u0012\u001c\bcA\u00152i9\u0011!f\f\b\u0003W9j\u0011\u0001\f\u0006\u0003[e\ta\u0001\u0010:p_Rt\u0014\"\u0001\t\n\u0005Az\u0011a\u00029bG.\fw-Z\u0005\u0003eM\u00121aU3r\u0015\t\u0001t\u0002\u0005\u0002\u000fk%\u0011ag\u0004\u0002\u0004\u0013:$\b\u0002\u0003\u001d\u0001\u0005\u0003\u0005\u000b\u0011B\u001d\u0002\u000b\rdwnY6\u0011\u0005ijT\"A\u001e\u000b\u0005q2\u0011\u0001B;uS2L!AP\u001e\u0003\u000b\rcwnY6\t\u0011\u0001\u0003!\u0011!Q\u0001\n\u0005\u000b1c\u00195fG.\u0004x.\u001b8u\t&\u0014x\n\u001d;j_:\u00042A\u0004\"E\u0013\t\u0019uB\u0001\u0004PaRLwN\u001c\t\u0003\u000b\"s!A\u0004$\n\u0005\u001d{\u0011A\u0002)sK\u0012,g-\u0003\u0002J\u0015\n11\u000b\u001e:j]\u001eT!aR\b\t\u000b1\u0003A\u0011A'\u0002\rqJg.\u001b;?)\u0019q\u0005+\u0015*T)B\u0011q\nA\u0007\u0002\u0005!)\u0001d\u0013a\u00015!)ad\u0013a\u0001?!)qe\u0013a\u0001Q!)\u0001h\u0013a\u0001s!)\u0001i\u0013a\u0001\u0003\u0016!a\u000b\u0001\u0003X\u0005I\u0011VmY3jm\u0016$'\t\\8dWF+X-^3\u0011\u0007akv,D\u0001Z\u0015\tQ6,A\u0004nkR\f'\r\\3\u000b\u0005q{\u0011AC2pY2,7\r^5p]&\u0011a,\u0017\u0002\u0006#V,W/\u001a\t\u0003\u001f\u0002L!!\u0019\u0002\u0003#I+7-Z5wK\u0012\u0014En\\2l\u0013:4w\u000eC\u0004d\u0001\t\u0007I\u0011\u00023\u0002AM$(/Z1n\u0013\u0012$v.\u00168bY2|7-\u0019;fI\ncwnY6Rk\u0016,Xm]\u000b\u0002KB!\u0001L\u001a\u001bi\u0013\t9\u0017LA\u0004ICNDW*\u00199\u0011\u0005%,V\"\u0001\u0001\t\r-\u0004\u0001\u0015!\u0003f\u0003\u0005\u001aHO]3b[&#Gk\\+oC2dwnY1uK\u0012\u0014En\\2l#V,W/Z:!\u0011\u001di\u0007A1A\u0005\n9\fQ\u0003^5nKR{\u0017\t\u001c7pG\u0006$X\r\u001a\"m_\u000e\\7/F\u0001p!\u0011Af\r\u001d;\u0011\u0005E\u0014X\"\u0001\u0003\n\u0005M$!\u0001\u0002+j[\u0016\u0004\"aT;\n\u0005Y\u0014!aD!mY>\u001c\u0017\r^3e\u00052|7m[:\t\ra\u0004\u0001\u0015!\u0003p\u0003Y!\u0018.\\3U_\u0006cGn\\2bi\u0016$'\t\\8dWN\u0004\u0003b\u0002>\u0001\u0005\u0004%Ia_\u0001\u0011Y><W*\u00198bO\u0016\u0014x\n\u001d;j_:,\u0012\u0001 \t\u0004\u001d\tk\bc\u0001@\u0002\u00025\tqP\u0003\u0002=\t%\u0019\u00111A@\u0003)]\u0013\u0018\u000e^3BQ\u0016\fG\rT8h\u001b\u0006t\u0017mZ3s\u0011\u001d\t9\u0001\u0001Q\u0001\nq\f\u0011\u0003\\8h\u001b\u0006t\u0017mZ3s\u001fB$\u0018n\u001c8!\u0011%\tY\u0001\u0001a\u0001\n\u0013\ti!\u0001\fmCN$\u0018\t\u001c7pG\u0006$X\r\u001a\"bi\u000eDG+[7f+\u0005\u0001\b\"CA\t\u0001\u0001\u0007I\u0011BA\n\u0003ia\u0017m\u001d;BY2|7-\u0019;fI\n\u000bGo\u00195US6,w\fJ3r)\u0011\t)\"a\u0007\u0011\u00079\t9\"C\u0002\u0002\u001a=\u0011A!\u00168ji\"I\u0011QDA\b\u0003\u0003\u0005\r\u0001]\u0001\u0004q\u0012\n\u0004bBA\u0011\u0001\u0001\u0006K\u0001]\u0001\u0018Y\u0006\u001cH/\u00117m_\u000e\fG/\u001a3CCR\u001c\u0007\u000eV5nK\u0002Bq!!\n\u0001\t\u0003\t9#\u0001\u0005bI\u0012\u0014En\\2l)\u0011\tI#a\f\u0011\u00079\tY#C\u0002\u0002.=\u0011qAQ8pY\u0016\fg\u000eC\u0004\u00022\u0005\r\u0002\u0019A0\u0002#I,7-Z5wK\u0012\u0014En\\2l\u0013:4w\u000eC\u0004\u00026\u0001!\t!a\u000e\u0002+\u0005dGn\\2bi\u0016\u0014En\\2lgR{')\u0019;dQR!\u0011QCA\u001d\u0011\u001d\tY$a\rA\u0002A\f\u0011BY1uG\"$\u0016.\\3\t\u000f\u0005}\u0002\u0001\"\u0001\u0002B\u0005\u0001r-\u001a;CY>\u001c7n](g\u0005\u0006$8\r\u001b\u000b\u0005\u0003\u0007\nY\u0005\u0005\u0004F\u0003\u000b\"\u0014\u0011J\u0005\u0004\u0003\u000fR%aA'baB\u0019\u0011&M0\t\u000f\u0005m\u0012Q\ba\u0001a\"9\u0011q\n\u0001\u0005\u0002\u0005E\u0013!G4fi\ncwnY6t\u001f\u001a\u0014\u0015\r^2i\u0003:$7\u000b\u001e:fC6$b!!\u0013\u0002T\u0005U\u0003bBA\u001e\u0003\u001b\u0002\r\u0001\u001d\u0005\b\u0003/\ni\u00051\u00015\u0003!\u0019HO]3b[&#\u0007bBA.\u0001\u0011\u0005\u0011QL\u0001\u001dQ\u0006\u001cXK\\1mY>\u001c\u0017\r^3e%\u0016\u001cW-\u001b<fI\ncwnY6t+\t\tI\u0003C\u0004\u0002b\u0001!\t!a\u0019\u0002)\u001d,G/\u00168bY2|7-\u0019;fI\ncwnY6t)\u0011\tI%!\u001a\t\u000f\u0005]\u0013q\fa\u0001i!9\u0011\u0011\u000e\u0001\u0005\u0002\u0005-\u0014!E2mK\u0006tW\u000f](mI\n\u000bGo\u00195fgR1\u0011QCA7\u0003cBq!a\u001c\u0002h\u0001\u0007\u0001/A\tdY\u0016\fg.\u001e9UQJ,7\u000f\u001b+j[\u0016D\u0001\"a\u001d\u0002h\u0001\u0007\u0011\u0011F\u0001\u0012o\u0006LGOR8s\u0007>l\u0007\u000f\\3uS>t\u0007bBA<\u0001\u0011\u0005\u0011\u0011P\u0001\u0005gR|\u0007\u000f\u0006\u0002\u0002\u0016!9\u0011Q\u0010\u0001\u0005\n\u0005e\u0014!\u0007:fG>4XM\u001d$s_6<&/\u001b;f\u0003\",\u0017\r\u001a'pONDq!!!\u0001\t\u0013\t\u0019)\u0001\u0006xe&$X\rV8M_\u001e$B!!\u0006\u0002\u0006\"A\u0011qQA@\u0001\u0004\tI)\u0001\u0004sK\u000e|'\u000f\u001a\t\u0004\u001f\u0006-\u0015bAAG\u0005\ta\"+Z2fSZ,GM\u00117pG.$&/Y2lKJdunZ#wK:$\bbBAI\u0001\u0011%\u00111S\u0001\u0016O\u0016$(+Z2fSZ,GM\u00117pG.\fV/Z;f)\rA\u0017Q\u0013\u0005\b\u0003/\ny\t1\u00015\u0011\u001d\tI\n\u0001C\u0005\u00037\u000b\u0001c\u0019:fCR,Gj\\4NC:\fw-\u001a:\u0015\u0003qD\u0001\"a(\u0001\t\u0003!\u0011QL\u0001\u0014SNdunZ'b]\u0006<WM]#oC\ndW\rZ\u0004\t\u0003G\u0013\u0001\u0012\u0001\u0003\u0002&\u0006!\"+Z2fSZ,GM\u00117pG.$&/Y2lKJ\u00042aTAT\r\u001d\t!\u0001#\u0001\u0005\u0003S\u001b2!a*\u000e\u0011\u001da\u0015q\u0015C\u0001\u0003[#\"!!*\t\u0011\u0005E\u0016q\u0015C\u0001\u0003g\u000bQc\u00195fG.\u0004x.\u001b8u\t&\u0014Hk\u001c'pO\u0012K'\u000fF\u0002E\u0003kCq!a.\u00020\u0002\u0007A)A\u0007dQ\u0016\u001c7\u000e]8j]R$\u0015N\u001d")
/* loaded from: input_file:org/apache/spark/streaming/scheduler/ReceivedBlockTracker.class */
public class ReceivedBlockTracker implements Logging {
    private final SparkConf conf;
    private final Configuration hadoopConf;
    private final Seq<Object> streamIds;
    private final Clock clock;
    public final Option<String> org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption;
    private final HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues;
    private final HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks;
    private final Option<WriteAheadLogManager> logManagerOption;
    private Time lastAllocatedBatchTime;
    private transient Logger org$apache$spark$Logging$$log_;

    public static String checkpointDirToLogDir(String str) {
        return ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir(str);
    }

    @Override // org.apache.spark.Logging
    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    @Override // org.apache.spark.Logging
    public void org$apache$spark$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$Logging$$log_ = logger;
    }

    @Override // org.apache.spark.Logging
    public String logName() {
        return Logging.Cclass.logName(this);
    }

    @Override // org.apache.spark.Logging
    public Logger log() {
        return Logging.Cclass.log(this);
    }

    @Override // org.apache.spark.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logWarning(Function0<String> function0) {
        Logging.Cclass.logWarning(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logError(Function0<String> function0) {
        Logging.Cclass.logError(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.Cclass.logInfo(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.Cclass.logDebug(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.Cclass.logTrace(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.Cclass.logWarning(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logError(Function0<String> function0, Throwable th) {
        Logging.Cclass.logError(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public boolean isTraceEnabled() {
        return Logging.Cclass.isTraceEnabled(this);
    }

    private HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues() {
        return this.streamIdToUnallocatedBlockQueues;
    }

    private HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks() {
        return this.timeToAllocatedBlocks;
    }

    private Option<WriteAheadLogManager> logManagerOption() {
        return this.logManagerOption;
    }

    private Time lastAllocatedBatchTime() {
        return this.lastAllocatedBatchTime;
    }

    private void lastAllocatedBatchTime_$eq(Time time) {
        this.lastAllocatedBatchTime = time;
    }

    public synchronized boolean addBlock(ReceivedBlockInfo receivedBlockInfo) {
        try {
            writeToLog(new BlockAdditionEvent(receivedBlockInfo));
            org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq2((Queue<ReceivedBlockInfo>) receivedBlockInfo);
            logDebug(new ReceivedBlockTracker$$anonfun$addBlock$1(this, receivedBlockInfo));
            return true;
        } catch (Exception e) {
            logError(new ReceivedBlockTracker$$anonfun$addBlock$2(this, receivedBlockInfo), e);
            return false;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v15 */
    public void allocateBlocksToBatch(Time time) {
        ?? r0 = this;
        synchronized (r0) {
            if (lastAllocatedBatchTime() == null || time.$greater(lastAllocatedBatchTime())) {
                AllocatedBlocks allocatedBlocks = new AllocatedBlocks(((TraversableOnce) this.streamIds.map(new ReceivedBlockTracker$$anonfun$1(this), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.conforms()));
                writeToLog(new BatchAllocationEvent(time, allocatedBlocks));
                timeToAllocatedBlocks().update(time, allocatedBlocks);
                lastAllocatedBatchTime_$eq(time);
            } else {
                logInfo(new ReceivedBlockTracker$$anonfun$allocateBlocksToBatch$1(this, time));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            r0 = r0;
        }
    }

    public synchronized Map<Object, Seq<ReceivedBlockInfo>> getBlocksOfBatch(Time time) {
        return (Map) timeToAllocatedBlocks().get(time).map(new ReceivedBlockTracker$$anonfun$getBlocksOfBatch$1(this)).getOrElse(new ReceivedBlockTracker$$anonfun$getBlocksOfBatch$2(this));
    }

    public synchronized Seq<ReceivedBlockInfo> getBlocksOfBatchAndStream(Time time, int i) {
        return (Seq) timeToAllocatedBlocks().get(time).map(new ReceivedBlockTracker$$anonfun$getBlocksOfBatchAndStream$1(this, i)).getOrElse(new ReceivedBlockTracker$$anonfun$getBlocksOfBatchAndStream$2(this));
    }

    public synchronized boolean hasUnallocatedReceivedBlocks() {
        return !streamIdToUnallocatedBlockQueues().values().forall(new ReceivedBlockTracker$$anonfun$hasUnallocatedReceivedBlocks$1(this));
    }

    public synchronized Seq<ReceivedBlockInfo> getUnallocatedBlocks(int i) {
        return org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(i).toSeq();
    }

    public synchronized void cleanupOldBatches(Time time, boolean z) {
        Predef$.MODULE$.m11915assert(time.milliseconds() < this.clock.getTimeMillis());
        Seq seq = timeToAllocatedBlocks().keys().filter(new ReceivedBlockTracker$$anonfun$2(this, time)).toSeq();
        logInfo(new ReceivedBlockTracker$$anonfun$cleanupOldBatches$1(this, seq));
        writeToLog(new BatchCleanupEvent(seq));
        timeToAllocatedBlocks().$minus$minus$eq(seq);
        logManagerOption().foreach(new ReceivedBlockTracker$$anonfun$cleanupOldBatches$2(this, time, z));
    }

    public void stop() {
        logManagerOption().foreach(new ReceivedBlockTracker$$anonfun$stop$1(this));
    }

    private synchronized void recoverFromWriteAheadLogs() {
        logManagerOption().foreach(new ReceivedBlockTracker$$anonfun$recoverFromWriteAheadLogs$1(this));
    }

    private void writeToLog(ReceivedBlockTrackerLogEvent receivedBlockTrackerLogEvent) {
        if (isLogManagerEnabled()) {
            logDebug(new ReceivedBlockTracker$$anonfun$writeToLog$1(this, receivedBlockTrackerLogEvent));
            logManagerOption().foreach(new ReceivedBlockTracker$$anonfun$writeToLog$2(this, receivedBlockTrackerLogEvent));
        }
    }

    public Queue<ReceivedBlockInfo> org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(int i) {
        return streamIdToUnallocatedBlockQueues().getOrElseUpdate(BoxesRunTime.boxToInteger(i), new ReceivedBlockTracker$$anonfun$org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue$1(this));
    }

    private Option<WriteAheadLogManager> createLogManager() {
        if (!this.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
            return None$.MODULE$;
        }
        if (this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.isEmpty()) {
            throw new SparkException("Cannot enable receiver write-ahead log without checkpoint directory set. Please use streamingContext.checkpoint() to set the checkpoint directory. See documentation for more details.");
        }
        String checkpointDirToLogDir = ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir(this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.get());
        int i = this.conf.getInt("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60);
        return new Some(new WriteAheadLogManager(checkpointDirToLogDir, this.hadoopConf, i, WriteAheadLogManager$.MODULE$.$lessinit$greater$default$4(), "ReceivedBlockHandlerMaster", this.clock));
    }

    public boolean isLogManagerEnabled() {
        return logManagerOption().nonEmpty();
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAddedBlock$1(ReceivedBlockInfo receivedBlockInfo) {
        logTrace(new ReceivedBlockTracker$$anonfun$org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAddedBlock$1$1(this, receivedBlockInfo));
        org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq2((Queue<ReceivedBlockInfo>) receivedBlockInfo);
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1(Time time, AllocatedBlocks allocatedBlocks) {
        logTrace(new ReceivedBlockTracker$$anonfun$org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1$1(this, time, allocatedBlocks));
        streamIdToUnallocatedBlockQueues().values().foreach(new ReceivedBlockTracker$$anonfun$org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1$2(this));
        lastAllocatedBatchTime_$eq(time);
        timeToAllocatedBlocks().put(time, allocatedBlocks);
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$cleanupBatches$1(Seq seq) {
        logTrace(new ReceivedBlockTracker$$anonfun$org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$cleanupBatches$1$1(this, seq));
        timeToAllocatedBlocks().$minus$minus$eq(seq);
    }

    public ReceivedBlockTracker(SparkConf sparkConf, Configuration configuration, Seq<Object> seq, Clock clock, Option<String> option) {
        this.conf = sparkConf;
        this.hadoopConf = configuration;
        this.streamIds = seq;
        this.clock = clock;
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption = option;
        org$apache$spark$Logging$$log__$eq(null);
        this.streamIdToUnallocatedBlockQueues = new HashMap<>();
        this.timeToAllocatedBlocks = new HashMap<>();
        this.logManagerOption = createLogManager();
        this.lastAllocatedBatchTime = null;
        recoverFromWriteAheadLogs();
    }
}
