package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.class */
public class DiskTierProducerAgent implements TierProducerAgent, NettyServiceProducer {
    private final TieredStoragePartitionId partitionId;
    private final int numBuffersPerSegment;
    private final int bufferSizeBytes;
    private final Path dataFilePath;
    private final float minReservedDiskSpaceFraction;
    private final TieredStorageMemoryManager memoryManager;
    private final DiskCacheManager diskCacheManager;
    private final List<Map<Integer, Integer>> firstBufferIndexInSegment;
    private final int[] currentSubpartitionWriteBuffers;
    private final DiskIOScheduler diskIOScheduler;
    private volatile boolean isReleased;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DiskTierProducerAgent(TieredStoragePartitionId tieredStoragePartitionId, int i, int i2, int i3, int i4, Path path, float f, boolean z, PartitionFileWriter partitionFileWriter, PartitionFileReader partitionFileReader, TieredStorageMemoryManager tieredStorageMemoryManager, TieredStorageNettyService tieredStorageNettyService, TieredStorageResourceRegistry tieredStorageResourceRegistry, BatchShuffleReadBufferPool batchShuffleReadBufferPool, ScheduledExecutorService scheduledExecutorService, int i5, Duration duration) {
        Preconditions.checkArgument(i2 >= i3, "One segment should contain at least one buffer.");
        this.partitionId = tieredStoragePartitionId;
        this.numBuffersPerSegment = i2 / i3;
        this.bufferSizeBytes = i3;
        this.dataFilePath = path;
        this.minReservedDiskSpaceFraction = f;
        this.memoryManager = tieredStorageMemoryManager;
        this.firstBufferIndexInSegment = new ArrayList();
        this.currentSubpartitionWriteBuffers = new int[i];
        for (int i6 = 0; i6 < i; i6++) {
            this.firstBufferIndexInSegment.add(new ConcurrentHashMap());
        }
        this.diskCacheManager = new DiskCacheManager(tieredStoragePartitionId, z ? 1 : i, i4, tieredStorageMemoryManager, partitionFileWriter);
        this.diskIOScheduler = new DiskIOScheduler(tieredStoragePartitionId, batchShuffleReadBufferPool, scheduledExecutorService, i5, duration, (v1, v2) -> {
            return firstBufferIndexToSegmentId(v1, v2);
        }, partitionFileReader);
        tieredStorageNettyService.registerProducer(tieredStoragePartitionId, this);
        tieredStorageResourceRegistry.registerResource(tieredStoragePartitionId, this::releaseResources);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent
    public boolean tryStartNewSegment(TieredStorageSubpartitionId tieredStorageSubpartitionId, int i, int i2) {
        File file = this.dataFilePath.toFile();
        boolean z = file.getUsableSpace() - (((long) Math.max(this.numBuffersPerSegment, i2)) * ((long) this.bufferSizeBytes)) > ((long) (((float) file.getTotalSpace()) * this.minReservedDiskSpaceFraction));
        if (z) {
            this.firstBufferIndexInSegment.get(tieredStorageSubpartitionId.getSubpartitionId()).put(Integer.valueOf(this.diskCacheManager.getBufferIndex(tieredStorageSubpartitionId.getSubpartitionId())), Integer.valueOf(i));
            this.diskCacheManager.startSegment(tieredStorageSubpartitionId.getSubpartitionId(), i);
        }
        return z;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent
    public boolean tryWrite(TieredStorageSubpartitionId tieredStorageSubpartitionId, Buffer buffer, Object obj, int i) {
        int subpartitionId = tieredStorageSubpartitionId.getSubpartitionId();
        if (this.currentSubpartitionWriteBuffers[subpartitionId] != 0 && this.currentSubpartitionWriteBuffers[subpartitionId] + 1 + i > this.numBuffersPerSegment) {
            emitEndOfSegmentEvent(subpartitionId);
            this.currentSubpartitionWriteBuffers[subpartitionId] = 0;
            return false;
        }
        if (buffer.isBuffer()) {
            this.memoryManager.transferBufferOwnership(obj, this, buffer);
        }
        int[] iArr = this.currentSubpartitionWriteBuffers;
        iArr[subpartitionId] = iArr[subpartitionId] + 1;
        emitBuffer(buffer, subpartitionId, i == 0);
        return true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer
    public void connectionEstablished(TieredStorageSubpartitionId tieredStorageSubpartitionId, NettyConnectionWriter nettyConnectionWriter) {
        if (!Files.isReadable(this.dataFilePath)) {
            throw new RuntimeException(new PartitionNotFoundException(TieredStorageIdMappingUtils.convertId(this.partitionId)));
        }
        this.diskIOScheduler.connectionEstablished(tieredStorageSubpartitionId, nettyConnectionWriter);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer
    public void connectionBroken(NettyConnectionId nettyConnectionId) {
        this.diskIOScheduler.connectionBroken(nettyConnectionId);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent, java.lang.AutoCloseable
    public void close() {
        this.diskCacheManager.close();
    }

    private void emitEndOfSegmentEvent(int i) {
        try {
            this.diskCacheManager.appendEndOfSegmentEvent(EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), i);
        } catch (IOException e) {
            ExceptionUtils.rethrow(e, "Failed to emit end of segment event.");
        }
    }

    private void emitBuffer(Buffer buffer, int i, boolean z) {
        this.diskCacheManager.append(buffer, i, z);
    }

    private void releaseResources() {
        if (this.isReleased) {
            return;
        }
        Iterator<Map<Integer, Integer>> it = this.firstBufferIndexInSegment.iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
        this.diskCacheManager.release();
        this.diskIOScheduler.release();
        this.isReleased = true;
    }

    private Integer firstBufferIndexToSegmentId(int i, int i2) {
        if (this.firstBufferIndexInSegment.size() > i) {
            return this.firstBufferIndexInSegment.get(i).get(Integer.valueOf(i2));
        }
        return null;
    }
}
