package com.azure.storage.blob.changefeed;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.storage.blob.changefeed.implementation.models.ChangefeedCursor;
import com.azure.storage.blob.changefeed.models.BlobChangefeedEvent;
import com.azure.storage.common.implementation.StorageImplUtils;
import java.time.OffsetDateTime;
import java.util.List;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/storage/blob/changefeed/BlobChangefeedPagedFlux.class */
public final class BlobChangefeedPagedFlux extends ContinuablePagedFlux<String, BlobChangefeedEvent, BlobChangefeedPagedResponse> {
    private final Changefeed changefeed;
    private Context context;
    private static final ClientLogger LOGGER = new ClientLogger((Class<?>) BlobChangefeedPagedFlux.class);
    private static final Integer DEFAULT_PAGE_SIZE = 5000;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlobChangefeedPagedFlux(ChangefeedFactory changefeedFactory, OffsetDateTime offsetDateTime, OffsetDateTime offsetDateTime2) {
        StorageImplUtils.assertNotNull("changefeedFactory", changefeedFactory);
        this.changefeed = changefeedFactory.getChangefeed(offsetDateTime, offsetDateTime2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlobChangefeedPagedFlux(ChangefeedFactory changefeedFactory, String str) {
        StorageImplUtils.assertNotNull("changefeedFactory", changefeedFactory);
        this.changefeed = changefeedFactory.getChangefeed(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlobChangefeedPagedFlux setSubscriberContext(Context context) {
        this.context = context;
        return this;
    }

    @Override // com.azure.core.util.paging.ContinuablePagedFlux
    public Flux<BlobChangefeedPagedResponse> byPage() {
        return byPage((String) null, DEFAULT_PAGE_SIZE.intValue());
    }

    @Override // com.azure.core.util.paging.ContinuablePagedFlux
    public Flux<BlobChangefeedPagedResponse> byPage(String str) {
        return byPage(str, DEFAULT_PAGE_SIZE.intValue());
    }

    @Override // com.azure.core.util.paging.ContinuablePagedFlux
    public Flux<BlobChangefeedPagedResponse> byPage(int i) {
        return byPage((String) null, i);
    }

    @Override // com.azure.core.util.paging.ContinuablePagedFlux
    public Flux<BlobChangefeedPagedResponse> byPage(String str, int i) {
        if (str != null) {
            return FluxUtil.pagedFluxError(LOGGER, new UnsupportedOperationException("continuationToken not supported. Use client.getEvents(String) to pass in a cursor."));
        }
        if (i <= 0) {
            return FluxUtil.pagedFluxError(LOGGER, new IllegalArgumentException("preferredPageSize > 0 required but provided: " + i));
        }
        return this.changefeed.getEvents().window(Integer.min(i, DEFAULT_PAGE_SIZE.intValue())).concatMap(flux -> {
            Flux cache = flux.cache();
            return Mono.zip(cache.map((v0) -> {
                return v0.getEvent();
            }).collectList(), cache.last().map((v0) -> {
                return v0.getCursor();
            }));
        }).map(tuple2 -> {
            return new BlobChangefeedPagedResponse((List) tuple2.getT1(), (ChangefeedCursor) tuple2.getT2());
        }).subscriberContext(FluxUtil.toReactorContext(this.context));
    }

    @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super BlobChangefeedEvent> coreSubscriber) {
        this.changefeed.getEvents().map((v0) -> {
            return v0.getEvent();
        }).subscribe((CoreSubscriber) coreSubscriber);
    }
}
