package org.apache.spark.streaming.receiver;

import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Logging;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.storage.StreamBlockId;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.streaming.util.WriteAheadLogRecordHandle;
import org.apache.spark.streaming.util.WriteAheadLogUtils$;
import org.apache.spark.util.Clock;
import org.apache.spark.util.ThreadUtils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.mutable.ArrayBuffer;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutorService;
import scala.concurrent.Future$;
import scala.concurrent.duration.Cpackage;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: ReceivedBlockHandler.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%d!B\u0001\u0003\u0001\u0011a!AH,sSR,\u0017\t[3bI2{wMQ1tK\u0012\u0014En\\2l\u0011\u0006tG\r\\3s\u0015\t\u0019A!\u0001\u0005sK\u000e,\u0017N^3s\u0015\t)a!A\u0005tiJ,\u0017-\\5oO*\u0011q\u0001C\u0001\u0006gB\f'o\u001b\u0006\u0003\u0013)\ta!\u00199bG\",'\"A\u0006\u0002\u0007=\u0014xm\u0005\u0003\u0001\u001bM9\u0002C\u0001\b\u0012\u001b\u0005y!\"\u0001\t\u0002\u000bM\u001c\u0017\r\\1\n\u0005Iy!AB!osJ+g\r\u0005\u0002\u0015+5\t!!\u0003\u0002\u0017\u0005\t!\"+Z2fSZ,GM\u00117pG.D\u0015M\u001c3mKJ\u0004\"\u0001G\r\u000e\u0003\u0019I!A\u0007\u0004\u0003\u000f1{wmZ5oO\"AA\u0004\u0001B\u0001B\u0003%a$\u0001\u0007cY>\u001c7.T1oC\u001e,'o\u0001\u0001\u0011\u0005}\u0011S\"\u0001\u0011\u000b\u0005\u00052\u0011aB:u_J\fw-Z\u0005\u0003G\u0001\u0012AB\u00117pG.l\u0015M\\1hKJD\u0001\"\n\u0001\u0003\u0002\u0003\u0006IAJ\u0001\tgR\u0014X-Y7JIB\u0011abJ\u0005\u0003Q=\u00111!\u00138u\u0011!Q\u0003A!A!\u0002\u0013Y\u0013\u0001D:u_J\fw-\u001a'fm\u0016d\u0007CA\u0010-\u0013\ti\u0003E\u0001\u0007Ti>\u0014\u0018mZ3MKZ,G\u000e\u0003\u00050\u0001\t\u0005\t\u0015!\u00031\u0003\u0011\u0019wN\u001c4\u0011\u0005a\t\u0014B\u0001\u001a\u0007\u0005%\u0019\u0006/\u0019:l\u0007>tg\r\u0003\u00055\u0001\t\u0005\t\u0015!\u00036\u0003)A\u0017\rZ8pa\u000e{gN\u001a\t\u0003mij\u0011a\u000e\u0006\u0003_aR!!\u000f\u0005\u0002\r!\fGm\\8q\u0013\tYtGA\u0007D_:4\u0017nZ;sCRLwN\u001c\u0005\t{\u0001\u0011\t\u0011)A\u0005}\u0005i1\r[3dWB|\u0017N\u001c;ESJ\u0004\"a\u0010\"\u000f\u00059\u0001\u0015BA!\u0010\u0003\u0019\u0001&/\u001a3fM&\u00111\t\u0012\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005\u0005{\u0001\u0002\u0003$\u0001\u0005\u0003\u0005\u000b\u0011B$\u0002\u000b\rdwnY6\u0011\u0005![U\"A%\u000b\u0005)3\u0011\u0001B;uS2L!\u0001T%\u0003\u000b\rcwnY6\t\u000b9\u0003A\u0011A(\u0002\rqJg.\u001b;?)!\u0001\u0016KU*U+Z;\u0006C\u0001\u000b\u0001\u0011\u0015aR\n1\u0001\u001f\u0011\u0015)S\n1\u0001'\u0011\u0015QS\n1\u0001,\u0011\u0015yS\n1\u00011\u0011\u0015!T\n1\u00016\u0011\u0015iT\n1\u0001?\u0011\u001d1U\n%AA\u0002\u001dCq!\u0017\u0001C\u0002\u0013%!,A\tcY>\u001c7n\u0015;pe\u0016$\u0016.\\3pkR,\u0012a\u0017\t\u00039\u0006l\u0011!\u0018\u0006\u0003=~\u000b\u0001\u0002Z;sCRLwN\u001c\u0006\u0003A>\t!bY8oGV\u0014(/\u001a8u\u0013\t\u0011WL\u0001\bGS:LG/\u001a#ve\u0006$\u0018n\u001c8\t\r\u0011\u0004\u0001\u0015!\u0003\\\u0003I\u0011Gn\\2l'R|'/\u001a+j[\u0016|W\u000f\u001e\u0011\t\u000f\u0019\u0004!\u0019!C\u0005O\u0006)RM\u001a4fGRLg/Z*u_J\fw-\u001a'fm\u0016dW#A\u0016\t\r%\u0004\u0001\u0015!\u0003,\u0003Y)gMZ3di&4Xm\u0015;pe\u0006<W\rT3wK2\u0004\u0003bB6\u0001\u0005\u0004%I\u0001\\\u0001\u000eoJLG/Z!iK\u0006$Gj\\4\u0016\u00035\u0004\"A\u001c9\u000e\u0003=T!A\u0013\u0003\n\u0005E|'!D,sSR,\u0017\t[3bI2{w\r\u0003\u0004t\u0001\u0001\u0006I!\\\u0001\u000foJLG/Z!iK\u0006$Gj\\4!\u0011\u001d)\bA1A\u0005\fY\f\u0001#\u001a=fGV$\u0018n\u001c8D_:$X\r\u001f;\u0016\u0003]\u0004\"\u0001_=\u000e\u0003}K!A_0\u0003?\u0015CXmY;uS>t7i\u001c8uKb$X\t_3dkR|'oU3sm&\u001cW\r\u0003\u0004}\u0001\u0001\u0006Ia^\u0001\u0012Kb,7-\u001e;j_:\u001cuN\u001c;fqR\u0004\u0003\"\u0002@\u0001\t\u0003y\u0018AC:u_J,'\t\\8dWR1\u0011\u0011AA\u0004\u0003#\u00012\u0001FA\u0002\u0013\r\t)A\u0001\u0002\u0019%\u0016\u001cW-\u001b<fI\ncwnY6Ti>\u0014XMU3tk2$\bbBA\u0005{\u0002\u0007\u00111B\u0001\bE2|7m[%e!\ry\u0012QB\u0005\u0004\u0003\u001f\u0001#!D*ue\u0016\fWN\u00117pG.LE\rC\u0004\u0002\u0014u\u0004\r!!\u0006\u0002\u000b\tdwnY6\u0011\u0007Q\t9\"C\u0002\u0002\u001a\t\u0011QBU3dK&4X\r\u001a\"m_\u000e\\\u0007bBA\u000f\u0001\u0011\u0005\u0011qD\u0001\u0011G2,\u0017M\\;q\u001f2$'\t\\8dWN$B!!\t\u0002(A\u0019a\"a\t\n\u0007\u0005\u0015rB\u0001\u0003V]&$\b\u0002CA\u0015\u00037\u0001\r!a\u000b\u0002\u0015QD'/Z:i)&lW\rE\u0002\u000f\u0003[I1!a\f\u0010\u0005\u0011auN\\4\t\u000f\u0005M\u0002\u0001\"\u0001\u00026\u0005!1\u000f^8q)\t\t\tc\u0002\u0005\u0002:\tA\t\u0001BA\u001e\u0003y9&/\u001b;f\u0003\",\u0017\r\u001a'pO\n\u000b7/\u001a3CY>\u001c7\u000eS1oI2,'\u000fE\u0002\u0015\u0003{1q!\u0001\u0002\t\u0002\u0011\tydE\u0002\u0002>5AqATA\u001f\t\u0003\t\u0019\u0005\u0006\u0002\u0002<!A\u0011qIA\u001f\t\u0003\tI%A\u000bdQ\u0016\u001c7\u000e]8j]R$\u0015N\u001d+p\u0019><G)\u001b:\u0015\u000by\nY%!\u0014\t\ru\n)\u00051\u0001?\u0011\u0019)\u0013Q\ta\u0001M!Q\u0011\u0011KA\u001f#\u0003%\t!a\u0015\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00138+\t\t)FK\u0002H\u0003/Z#!!\u0017\u0011\t\u0005m\u0013QM\u0007\u0003\u0003;RA!a\u0018\u0002b\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003Gz\u0011AC1o]>$\u0018\r^5p]&!\u0011qMA/\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a")
/* loaded from: input_file:org/apache/spark/streaming/receiver/WriteAheadLogBasedBlockHandler.class */
public class WriteAheadLogBasedBlockHandler implements ReceivedBlockHandler, Logging {
    public final BlockManager org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$blockManager;
    public final StorageLevel org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$storageLevel;
    public final Clock org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$clock;
    private final FiniteDuration blockStoreTimeout;
    private final StorageLevel org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$effectiveStorageLevel;
    private final WriteAheadLog org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$writeAheadLog;
    private final ExecutionContextExecutorService executionContext;
    private transient Logger org$apache$spark$Logging$$log_;

    public static String checkpointDirToLogDir(String str, int i) {
        return WriteAheadLogBasedBlockHandler$.MODULE$.checkpointDirToLogDir(str, i);
    }

    @Override // org.apache.spark.Logging
    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    @Override // org.apache.spark.Logging
    public void org$apache$spark$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$Logging$$log_ = logger;
    }

    @Override // org.apache.spark.Logging
    public String logName() {
        return Logging.Cclass.logName(this);
    }

    @Override // org.apache.spark.Logging
    public Logger log() {
        return Logging.Cclass.log(this);
    }

    @Override // org.apache.spark.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logWarning(Function0<String> function0) {
        Logging.Cclass.logWarning(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logError(Function0<String> function0) {
        Logging.Cclass.logError(this, function0);
    }

    @Override // org.apache.spark.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.Cclass.logInfo(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.Cclass.logDebug(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.Cclass.logTrace(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.Cclass.logWarning(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public void logError(Function0<String> function0, Throwable th) {
        Logging.Cclass.logError(this, function0, th);
    }

    @Override // org.apache.spark.Logging
    public boolean isTraceEnabled() {
        return Logging.Cclass.isTraceEnabled(this);
    }

    private FiniteDuration blockStoreTimeout() {
        return this.blockStoreTimeout;
    }

    public StorageLevel org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$effectiveStorageLevel() {
        return this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$effectiveStorageLevel;
    }

    public WriteAheadLog org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$writeAheadLog() {
        return this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$writeAheadLog;
    }

    private ExecutionContextExecutorService executionContext() {
        return this.executionContext;
    }

    @Override // org.apache.spark.streaming.receiver.ReceivedBlockHandler
    public ReceivedBlockStoreResult storeBlock(StreamBlockId streamBlockId, ReceivedBlock receivedBlock) {
        ByteBuffer byteBuffer;
        Option option = None$.MODULE$;
        if (receivedBlock instanceof ArrayBufferBlock) {
            ArrayBuffer<?> arrayBuffer = ((ArrayBufferBlock) receivedBlock).arrayBuffer();
            option = new Some(BoxesRunTime.boxToLong(arrayBuffer.size()));
            byteBuffer = this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$blockManager.dataSerialize(streamBlockId, arrayBuffer.iterator(), this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$blockManager.dataSerialize$default$3());
        } else if (receivedBlock instanceof IteratorBlock) {
            CountingIterator countingIterator = new CountingIterator(((IteratorBlock) receivedBlock).iterator());
            ByteBuffer dataSerialize = this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$blockManager.dataSerialize(streamBlockId, countingIterator, this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$blockManager.dataSerialize$default$3());
            option = countingIterator.count();
            byteBuffer = dataSerialize;
        } else {
            if (!(receivedBlock instanceof ByteBufferBlock)) {
                throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Could not push ", " to block manager, unexpected block type"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{streamBlockId})));
            }
            byteBuffer = ((ByteBufferBlock) receivedBlock).byteBuffer();
        }
        ByteBuffer byteBuffer2 = byteBuffer;
        return new WriteAheadLogBasedStoreResult(streamBlockId, option, (WriteAheadLogRecordHandle) Await$.MODULE$.result(Future$.MODULE$.apply(new WriteAheadLogBasedBlockHandler$$anonfun$1(this, streamBlockId, byteBuffer2), executionContext()).zip(Future$.MODULE$.apply(new WriteAheadLogBasedBlockHandler$$anonfun$5(this, byteBuffer2), executionContext())).map(new WriteAheadLogBasedBlockHandler$$anonfun$6(this), executionContext()), blockStoreTimeout()));
    }

    @Override // org.apache.spark.streaming.receiver.ReceivedBlockHandler
    public void cleanupOldBlocks(long j) {
        org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$writeAheadLog().clean(j, false);
    }

    public void stop() {
        org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$writeAheadLog().close();
        executionContext().shutdown();
    }

    public WriteAheadLogBasedBlockHandler(BlockManager blockManager, int i, StorageLevel storageLevel, SparkConf sparkConf, Configuration configuration, String str, Clock clock) {
        this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$blockManager = blockManager;
        this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$storageLevel = storageLevel;
        this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$clock = clock;
        org$apache$spark$Logging$$log__$eq(null);
        this.blockStoreTimeout = new Cpackage.DurationInt(package$.MODULE$.DurationInt(sparkConf.getInt("spark.streaming.receiver.blockStoreTimeout", 30))).seconds();
        if (storageLevel.deserialized()) {
            logWarning(new WriteAheadLogBasedBlockHandler$$anonfun$2(this));
        }
        if (storageLevel.replication() > 1) {
            logWarning(new WriteAheadLogBasedBlockHandler$$anonfun$3(this));
        }
        this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$effectiveStorageLevel = StorageLevel$.MODULE$.apply(storageLevel.useDisk(), storageLevel.useMemory(), storageLevel.useOffHeap(), false, 1);
        StorageLevel org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$effectiveStorageLevel = org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$effectiveStorageLevel();
        if (storageLevel != null ? !storageLevel.equals(org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$effectiveStorageLevel) : org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$effectiveStorageLevel != null) {
            logWarning(new WriteAheadLogBasedBlockHandler$$anonfun$4(this));
        }
        this.org$apache$spark$streaming$receiver$WriteAheadLogBasedBlockHandler$$writeAheadLog = WriteAheadLogUtils$.MODULE$.createLogForReceiver(sparkConf, WriteAheadLogBasedBlockHandler$.MODULE$.checkpointDirToLogDir(str, i), configuration);
        this.executionContext = ExecutionContext$.MODULE$.fromExecutorService(ThreadUtils$.MODULE$.newDaemonFixedThreadPool(2, getClass().getSimpleName()));
    }
}
