/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.transaction.util;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.transaction.log.CoreTransactionLogger;
import java.util.Objects;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

@Stability.Internal
public class MonoBridge<T> {
    private boolean done;
    private final Sinks.One<T> actual = Sinks.one();
    private final Mono<T> external = this.actual.asMono();

    public MonoBridge(Mono<T> feedFrom, String dbg, Object syncer, @Nullable CoreTransactionLogger logger) {
        Objects.requireNonNull(feedFrom);
        Objects.requireNonNull(dbg);
        Objects.requireNonNull(syncer);
        feedFrom.onErrorResume(err -> {
            Object object = syncer;
            synchronized (object) {
                if (!this.done) {
                    if (logger != null) {
                        logger.info("", "MB: [{}] propagating err {}", dbg, err.toString());
                    }
                    this.actual.tryEmitError((Throwable)err).orThrow();
                } else if (logger != null) {
                    logger.info("", "MB: [{}] skipping err propagating as done", dbg);
                }
                return Mono.empty();
            }
        }).subscribe(next -> {
            if (!this.done) {
                if (logger != null) {
                    logger.info("", "MB: [{}] propagating next", dbg);
                }
                this.actual.tryEmitValue(next).orThrow();
            } else if (logger != null) {
                logger.info("", "MB: [{}] skipping next propagating as done", dbg);
            }
        }, err -> {
            throw new IllegalStateException("Should not reach MonoBridge error producer");
        }, () -> {
            if (!this.done) {
                if (logger != null) {
                    logger.info("", "MB: [{}] propagating complete", dbg);
                }
                this.actual.tryEmitEmpty();
            } else if (logger != null) {
                logger.info("", "MB: [{}] skipping complete propagating as done", dbg);
            }
        });
        this.external.doOnCancel(() -> {
            if (logger != null) {
                logger.info("", "MB: [{}] is cancelled", dbg);
            }
            this.done = true;
        }).doOnTerminate(() -> {
            if (logger != null) {
                logger.info("", "MB: [{}] is errored or complete", dbg);
            }
            this.done = true;
        });
    }

    public Mono<T> external() {
        return this.external;
    }
}

