/*
 * Decompiled with CFR 0.152.
 */
package com.azure.storage.blob.changefeed;

import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.changefeed.SegmentFactory;
import com.azure.storage.blob.changefeed.implementation.models.BlobChangefeedEventWrapper;
import com.azure.storage.blob.changefeed.implementation.models.ChangefeedCursor;
import com.azure.storage.blob.changefeed.implementation.util.DownloadUtils;
import com.azure.storage.blob.changefeed.implementation.util.TimeUtils;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.ListBlobsOptions;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.OffsetDateTime;
import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

class Changefeed {
    private static final ClientLogger LOGGER = new ClientLogger(Changefeed.class);
    private static final String SEGMENT_PREFIX = "idx/segments/";
    private static final String METADATA_SEGMENT_PATH = "meta/segments.json";
    private final BlobContainerAsyncClient client;
    private final OffsetDateTime startTime;
    private final OffsetDateTime endTime;
    private final ChangefeedCursor changefeedCursor;
    private final ChangefeedCursor userCursor;
    private final SegmentFactory segmentFactory;

    Changefeed(BlobContainerAsyncClient client, OffsetDateTime startTime, OffsetDateTime endTime, ChangefeedCursor userCursor, SegmentFactory segmentFactory) {
        this.client = client;
        this.startTime = TimeUtils.roundDownToNearestHour(startTime);
        this.endTime = TimeUtils.roundUpToNearestHour(endTime);
        this.userCursor = userCursor;
        this.segmentFactory = segmentFactory;
        String urlHost = null;
        try {
            urlHost = new URL(client.getBlobContainerUrl()).getHost();
        }
        catch (MalformedURLException e) {
            throw LOGGER.logExceptionAsError(new RuntimeException(e));
        }
        this.changefeedCursor = new ChangefeedCursor(urlHost, this.endTime);
        if (userCursor != null) {
            if (userCursor.getCursorVersion() != 1) {
                throw LOGGER.logExceptionAsError(new IllegalArgumentException("Unsupported cursor version."));
            }
            if (!Objects.equals(urlHost, userCursor.getUrlHost())) {
                throw LOGGER.logExceptionAsError(new IllegalArgumentException("Cursor URL host does not match container URL host."));
            }
        }
    }

    Flux<BlobChangefeedEventWrapper> getEvents() {
        return this.validateChangefeed().then(this.populateLastConsumable()).flatMapMany(safeEndTime -> this.listYears((OffsetDateTime)safeEndTime).map(str -> Tuples.of(safeEndTime, str))).concatMap(tuple2 -> {
            OffsetDateTime safeEndTime = (OffsetDateTime)tuple2.getT1();
            String year = (String)tuple2.getT2();
            return this.listSegmentsForYear(safeEndTime, year);
        }).concatMap(this::getEventsForSegment);
    }

    private Mono<Boolean> validateChangefeed() {
        return this.client.exists().flatMap(exists -> {
            if (exists == null || !exists.booleanValue()) {
                return FluxUtil.monoError(LOGGER, new RuntimeException("Changefeed has not been enabled for this account."));
            }
            return Mono.just(true);
        });
    }

    private Mono<OffsetDateTime> populateLastConsumable() {
        return DownloadUtils.downloadToByteArray(this.client, METADATA_SEGMENT_PATH).flatMap(DownloadUtils::parseJson).flatMap(jsonNode -> {
            OffsetDateTime lastConsumableTime = OffsetDateTime.parse(jsonNode.get("lastConsumable").asText());
            OffsetDateTime safeEndTime = this.endTime;
            if (lastConsumableTime.isBefore(this.endTime)) {
                safeEndTime = lastConsumableTime.plusHours(1L);
            }
            return Mono.just(safeEndTime);
        });
    }

    private Flux<String> listYears(OffsetDateTime safeEndTime) {
        return this.client.listBlobsByHierarchy(SEGMENT_PREFIX).map(BlobItem::getName).filter(yearPath -> TimeUtils.validYear(yearPath, this.startTime, safeEndTime));
    }

    private Flux<String> listSegmentsForYear(OffsetDateTime safeEndTime, String year) {
        return this.client.listBlobs(new ListBlobsOptions().setPrefix(year)).map(BlobItem::getName).filter(segmentPath -> TimeUtils.validSegment(segmentPath, this.startTime, safeEndTime));
    }

    private Flux<BlobChangefeedEventWrapper> getEventsForSegment(String segment) {
        OffsetDateTime segmentTime = TimeUtils.convertPathToTime(segment);
        if (this.userCursor != null && segmentTime.isEqual(this.startTime)) {
            return this.segmentFactory.getSegment(segment, this.changefeedCursor.toSegmentCursor(segment, this.userCursor.getCurrentSegmentCursor()), this.userCursor.getCurrentSegmentCursor()).getEvents();
        }
        return this.segmentFactory.getSegment(segment, this.changefeedCursor.toSegmentCursor(segment, null), null).getEvents();
    }
}

