package com.azure.storage.blob.changefeed;

import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.changefeed.implementation.models.BlobChangefeedEventWrapper;
import com.azure.storage.blob.changefeed.implementation.models.ChangefeedCursor;
import com.azure.storage.blob.changefeed.implementation.util.DownloadUtils;
import com.azure.storage.blob.changefeed.implementation.util.TimeUtils;
import com.azure.storage.blob.models.ListBlobsOptions;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.OffsetDateTime;
import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/storage/blob/changefeed/Changefeed.class */
public class Changefeed {
    private static final ClientLogger LOGGER = new ClientLogger(Changefeed.class);
    private static final String SEGMENT_PREFIX = "idx/segments/";
    private static final String METADATA_SEGMENT_PATH = "meta/segments.json";
    private final BlobContainerAsyncClient client;
    private final OffsetDateTime startTime;
    private final OffsetDateTime endTime;
    private final ChangefeedCursor changefeedCursor;
    private final ChangefeedCursor userCursor;
    private final SegmentFactory segmentFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Changefeed(BlobContainerAsyncClient blobContainerAsyncClient, OffsetDateTime offsetDateTime, OffsetDateTime offsetDateTime2, ChangefeedCursor changefeedCursor, SegmentFactory segmentFactory) {
        this.client = blobContainerAsyncClient;
        this.startTime = TimeUtils.roundDownToNearestHour(offsetDateTime);
        this.endTime = TimeUtils.roundUpToNearestHour(offsetDateTime2);
        this.userCursor = changefeedCursor;
        this.segmentFactory = segmentFactory;
        try {
            String host = new URL(blobContainerAsyncClient.getBlobContainerUrl()).getHost();
            this.changefeedCursor = new ChangefeedCursor(host, this.endTime);
            if (changefeedCursor != null) {
                if (changefeedCursor.getCursorVersion() != 1) {
                    throw LOGGER.logExceptionAsError(new IllegalArgumentException("Unsupported cursor version."));
                }
                if (!Objects.equals(host, changefeedCursor.getUrlHost())) {
                    throw LOGGER.logExceptionAsError(new IllegalArgumentException("Cursor URL host does not match container URL host."));
                }
            }
        } catch (MalformedURLException e) {
            throw LOGGER.logExceptionAsError(new RuntimeException(e));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<BlobChangefeedEventWrapper> getEvents() {
        return validateChangefeed().then(populateLastConsumable()).flatMapMany(offsetDateTime -> {
            return listYears(offsetDateTime).map(str -> {
                return Tuples.of(offsetDateTime, str);
            });
        }).concatMap(tuple2 -> {
            return listSegmentsForYear((OffsetDateTime) tuple2.getT1(), (String) tuple2.getT2());
        }).concatMap(this::getEventsForSegment);
    }

    private Mono<Boolean> validateChangefeed() {
        return this.client.exists().flatMap(bool -> {
            return (bool == null || !bool.booleanValue()) ? FluxUtil.monoError(LOGGER, new RuntimeException("Changefeed has not been enabled for this account.")) : Mono.just(true);
        });
    }

    private Mono<OffsetDateTime> populateLastConsumable() {
        return DownloadUtils.downloadToByteArray(this.client, METADATA_SEGMENT_PATH).flatMap(DownloadUtils::parseJson).flatMap(jsonNode -> {
            OffsetDateTime parse = OffsetDateTime.parse(jsonNode.get("lastConsumable").asText());
            OffsetDateTime offsetDateTime = this.endTime;
            if (parse.isBefore(this.endTime)) {
                offsetDateTime = parse.plusHours(1L);
            }
            return Mono.just(offsetDateTime);
        });
    }

    private Flux<String> listYears(OffsetDateTime offsetDateTime) {
        return this.client.listBlobsByHierarchy(SEGMENT_PREFIX).map((v0) -> {
            return v0.getName();
        }).filter(str -> {
            return TimeUtils.validYear(str, this.startTime, offsetDateTime);
        });
    }

    private Flux<String> listSegmentsForYear(OffsetDateTime offsetDateTime, String str) {
        return this.client.listBlobs(new ListBlobsOptions().setPrefix(str)).map((v0) -> {
            return v0.getName();
        }).filter(str2 -> {
            return TimeUtils.validSegment(str2, this.startTime, offsetDateTime);
        });
    }

    private Flux<BlobChangefeedEventWrapper> getEventsForSegment(String str) {
        return (this.userCursor == null || !TimeUtils.convertPathToTime(str).isEqual(this.startTime)) ? this.segmentFactory.getSegment(str, this.changefeedCursor.toSegmentCursor(str, null), null).getEvents() : this.segmentFactory.getSegment(str, this.changefeedCursor.toSegmentCursor(str, this.userCursor.getCurrentSegmentCursor()), this.userCursor.getCurrentSegmentCursor()).getEvents();
    }
}
