package com.couchbase.client.core.transaction.cleanup;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.cnc.TracingIdentifiers;
import com.couchbase.client.core.cnc.events.transaction.TransactionCleanupAttemptEvent;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.msg.kv.CodecFlags;
import com.couchbase.client.core.msg.kv.SubdocCommandType;
import com.couchbase.client.core.msg.kv.SubdocGetResponse;
import com.couchbase.client.core.msg.kv.SubdocMutateRequest;
import com.couchbase.client.core.transaction.CoreTransactionGetResult;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecordEntry;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecordUtil;
import com.couchbase.client.core.transaction.components.DocRecord;
import com.couchbase.client.core.transaction.components.DocumentGetter;
import com.couchbase.client.core.transaction.components.DurabilityLevelUtil;
import com.couchbase.client.core.transaction.error.internal.ErrorClass;
import com.couchbase.client.core.transaction.forwards.CoreTransactionsSupportedExtensions;
import com.couchbase.client.core.transaction.forwards.ForwardCompatibility;
import com.couchbase.client.core.transaction.forwards.ForwardCompatibilityStage;
import com.couchbase.client.core.transaction.log.CoreTransactionLogger;
import com.couchbase.client.core.transaction.support.AttemptState;
import com.couchbase.client.core.transaction.support.OptionsUtil;
import com.couchbase.client.core.transaction.support.SpanWrapper;
import com.couchbase.client.core.transaction.support.SpanWrapperUtil;
import com.couchbase.client.core.transaction.support.TransactionFields;
import com.couchbase.client.core.transaction.util.DebugUtil;
import com.couchbase.client.core.transaction.util.MeteringUnits;
import com.couchbase.client.core.transaction.util.TransactionKVHandler;
import com.couchbase.client.core.transaction.util.TriFunction;
import com.couchbase.client.core.util.Bytes;
import com.couchbase.client.core.util.CbPreconditions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/transaction/cleanup/TransactionsCleaner.class */
public class TransactionsCleaner {
    private final Core core;
    private final CleanerHooks hooks;
    private final CoreTransactionsSupportedExtensions supported;
    private static final int BEING_LOGGING_FAILED_CLEANUPS_AT_WARN_AFTER_X_MINUTES = 120;

    public TransactionsCleaner(Core core, CleanerHooks cleanerHooks, CoreTransactionsSupportedExtensions coreTransactionsSupportedExtensions) {
        this.core = (Core) Objects.requireNonNull(core);
        this.hooks = (CleanerHooks) Objects.requireNonNull(cleanerHooks);
        this.supported = (CoreTransactionsSupportedExtensions) Objects.requireNonNull(coreTransactionsSupportedExtensions);
    }

    private Duration kvDurableTimeout() {
        return this.core.context().environment().timeoutConfig().kvDurableTimeout();
    }

    private Duration kvNonMutatingTimeout() {
        return this.core.context().environment().timeoutConfig().kvTimeout();
    }

    Mono<Void> cleanupDocs(CoreTransactionLogger coreTransactionLogger, CleanupRequest cleanupRequest, SpanWrapper spanWrapper) {
        String attemptId = cleanupRequest.attemptId();
        switch (cleanupRequest.state()) {
            case COMMITTED:
                return commitDocs(coreTransactionLogger, attemptId, cleanupRequest.stagedInserts(), cleanupRequest, spanWrapper).then(commitDocs(coreTransactionLogger, attemptId, cleanupRequest.stagedReplaces(), cleanupRequest, spanWrapper)).then(removeDocsStagedForRemoval(coreTransactionLogger, attemptId, cleanupRequest.stagedRemoves(), cleanupRequest, spanWrapper));
            case ABORTED:
                return removeDocs(coreTransactionLogger, attemptId, cleanupRequest.stagedInserts(), cleanupRequest, spanWrapper).then(removeTxnLinks(coreTransactionLogger, attemptId, cleanupRequest.stagedReplaces(), cleanupRequest, spanWrapper)).then(removeTxnLinks(coreTransactionLogger, attemptId, cleanupRequest.stagedRemoves(), cleanupRequest, spanWrapper));
            case PENDING:
                coreTransactionLogger.logDefer(cleanupRequest.attemptId(), "No docs cleanup possible as txn in state {}, just removing", Event.Severity.DEBUG, cleanupRequest.state());
                return Mono.empty();
            case COMPLETED:
            case ROLLED_BACK:
            case NOT_STARTED:
            default:
                coreTransactionLogger.logDefer(cleanupRequest.attemptId(), "No docs cleanup to do as txn in state {}, just removing", Event.Severity.DEBUG, cleanupRequest.state());
                return Mono.empty();
        }
    }

    private Mono<Void> commitDocs(CoreTransactionLogger coreTransactionLogger, String str, List<DocRecord> list, CleanupRequest cleanupRequest, SpanWrapper spanWrapper) {
        return doPerDoc(coreTransactionLogger, str, list, spanWrapper, true, (collectionIdentifier, coreTransactionGetResult, subdocGetResponse) -> {
            CbPreconditions.check(coreTransactionGetResult.links() != null);
            CbPreconditions.check(coreTransactionGetResult.links().isDocumentInTransaction());
            CbPreconditions.check(coreTransactionGetResult.links().stagedContentJsonOrBinary().isPresent());
            byte[] bArr = coreTransactionGetResult.links().stagedContentJsonOrBinary().get();
            return this.hooks.beforeCommitDoc.apply(coreTransactionGetResult.id()).then(Mono.defer(() -> {
                return subdocGetResponse.isDeleted() ? TransactionKVHandler.insert(this.core, collectionIdentifier, coreTransactionGetResult.id(), bArr, coreTransactionGetResult.links().stagedUserFlags().orElse(Integer.valueOf(CodecFlags.JSON_COMMON_FLAGS)).intValue(), kvDurableTimeout(), cleanupRequest.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocsInsert"), spanWrapper) : TransactionKVHandler.mutateIn(this.core, collectionIdentifier, coreTransactionGetResult.id(), kvDurableTimeout(), false, false, false, subdocGetResponse.isDeleted(), false, coreTransactionGetResult.cas(), coreTransactionGetResult.links().stagedUserFlags().orElse(Integer.valueOf(CodecFlags.JSON_COMMON_FLAGS)).intValue(), cleanupRequest.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocs"), spanWrapper, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, TransactionFields.TRANSACTION_INTERFACE_PREFIX_ONLY, null, false, true, false, 0), new SubdocMutateRequest.Command(SubdocCommandType.SET_DOC, "", bArr, false, false, false, 1)));
            })).doOnSubscribe(subscription -> {
                coreTransactionLogger.logDefer(str, "removing txn links and writing content to doc {}", Event.Severity.DEBUG, DebugUtil.docId(coreTransactionGetResult));
            }).then();
        });
    }

    private Mono<Void> removeTxnLinks(CoreTransactionLogger coreTransactionLogger, String str, List<DocRecord> list, CleanupRequest cleanupRequest, SpanWrapper spanWrapper) {
        return doPerDoc(coreTransactionLogger, str, list, spanWrapper, false, (collectionIdentifier, coreTransactionGetResult, subdocGetResponse) -> {
            return this.hooks.beforeRemoveLinks.apply(coreTransactionGetResult.id()).then(TransactionKVHandler.mutateIn(this.core, collectionIdentifier, coreTransactionGetResult.id(), kvDurableTimeout(), false, false, false, subdocGetResponse.isDeleted(), false, coreTransactionGetResult.cas(), coreTransactionGetResult.userFlags(), cleanupRequest.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::removeTxnLinks"), spanWrapper, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, TransactionFields.TRANSACTION_INTERFACE_PREFIX_ONLY, Bytes.EMPTY_BYTE_ARRAY, false, true, false, 0)))).doOnSubscribe(subscription -> {
                coreTransactionLogger.logDefer(str, "removing txn links from doc {}", Event.Severity.DEBUG, DebugUtil.docId(coreTransactionGetResult));
            }).then();
        });
    }

    private Mono<Void> removeDocsStagedForRemoval(CoreTransactionLogger coreTransactionLogger, String str, List<DocRecord> list, CleanupRequest cleanupRequest, SpanWrapper spanWrapper) {
        return doPerDoc(coreTransactionLogger, str, list, spanWrapper, true, (collectionIdentifier, coreTransactionGetResult, subdocGetResponse) -> {
            return coreTransactionGetResult.links().isDocumentBeingRemoved() ? this.hooks.beforeRemoveDocStagedForRemoval.apply(coreTransactionGetResult.id()).then(TransactionKVHandler.remove(this.core, collectionIdentifier, coreTransactionGetResult.id(), kvDurableTimeout(), coreTransactionGetResult.cas(), cleanupRequest.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::removeDoc"), spanWrapper)).doOnSubscribe(subscription -> {
                coreTransactionLogger.debug(str, "removing doc {}", coreTransactionGetResult.id());
            }).then() : Mono.create(monoSink -> {
                coreTransactionLogger.debug(str, "doc {} does not have expected remove indication, skipping", DebugUtil.docId(coreTransactionGetResult));
                monoSink.success();
            });
        });
    }

    private Mono<Void> removeDocs(CoreTransactionLogger coreTransactionLogger, String str, List<DocRecord> list, CleanupRequest cleanupRequest, SpanWrapper spanWrapper) {
        return doPerDoc(coreTransactionLogger, str, list, spanWrapper, false, (collectionIdentifier, coreTransactionGetResult, subdocGetResponse) -> {
            return this.hooks.beforeRemoveDoc.apply(coreTransactionGetResult.id()).then(Mono.defer(() -> {
                return subdocGetResponse.isDeleted() ? TransactionKVHandler.mutateIn(this.core, collectionIdentifier, coreTransactionGetResult.id(), kvDurableTimeout(), false, false, false, true, false, coreTransactionGetResult.cas(), coreTransactionGetResult.userFlags(), cleanupRequest.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocs"), spanWrapper, Collections.singletonList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, TransactionFields.TRANSACTION_INTERFACE_PREFIX_ONLY, Bytes.EMPTY_BYTE_ARRAY, false, true, false, 0))) : TransactionKVHandler.remove(this.core, collectionIdentifier, coreTransactionGetResult.id(), kvDurableTimeout(), coreTransactionGetResult.cas(), cleanupRequest.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::removeDocs"), spanWrapper);
            })).doOnSubscribe(subscription -> {
                coreTransactionLogger.debug(str, "removing doc {}", DebugUtil.docId(coreTransactionGetResult));
            }).then();
        });
    }

    private Mono<Void> doPerDoc(CoreTransactionLogger coreTransactionLogger, String str, List<DocRecord> list, SpanWrapper spanWrapper, boolean z, TriFunction<CollectionIdentifier, CoreTransactionGetResult, SubdocGetResponse, Mono<Void>> triFunction) {
        return Flux.fromIterable(list).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup()).concatMap(docRecord -> {
            return this.hooks.beforeDocGet.apply(docRecord.id()).then(doPerDocGotDoc(coreTransactionLogger, str, spanWrapper, z, triFunction, docRecord, new CollectionIdentifier(docRecord.bucketName(), Optional.of(docRecord.scopeName()), Optional.of(docRecord.collectionName())), new MeteringUnits.MeteringUnitsBuilder()));
        }).then();
    }

    private Mono<Void> doPerDocGotDoc(CoreTransactionLogger coreTransactionLogger, String str, SpanWrapper spanWrapper, boolean z, TriFunction<CollectionIdentifier, CoreTransactionGetResult, SubdocGetResponse, Mono<Void>> triFunction, DocRecord docRecord, CollectionIdentifier collectionIdentifier, MeteringUnits.MeteringUnitsBuilder meteringUnitsBuilder) {
        return DocumentGetter.justGetDoc(this.core, collectionIdentifier, docRecord.id(), kvNonMutatingTimeout(), spanWrapper, true, coreTransactionLogger, meteringUnitsBuilder).flatMap(optional -> {
            if (!optional.isPresent()) {
                coreTransactionLogger.debug(str, "could not get doc {}, skipping", DebugUtil.docId(collectionIdentifier, docRecord.id()));
                return Mono.empty();
            }
            CoreTransactionGetResult coreTransactionGetResult = (CoreTransactionGetResult) ((Tuple2) optional.get()).getT1();
            SubdocGetResponse subdocGetResponse = (SubdocGetResponse) ((Tuple2) optional.get()).getT2();
            coreTransactionLogger.debug(str, "handling doc {} with cas {} and links {}, isTombstone={}{}", DebugUtil.docId(coreTransactionGetResult), Long.valueOf(coreTransactionGetResult.cas()), coreTransactionGetResult.links(), Boolean.valueOf(subdocGetResponse.isDeleted()), DebugUtil.dbg(meteringUnitsBuilder.build()));
            if (!coreTransactionGetResult.links().isDocumentInTransaction()) {
                coreTransactionLogger.debug(str, "no staged content for doc {}, assuming it was committed and skipping", DebugUtil.docId(coreTransactionGetResult));
                return Mono.empty();
            }
            if (!coreTransactionGetResult.links().stagedAttemptId().get().equals(str)) {
                coreTransactionLogger.debug(str, "for doc {}, staged version is for a different attempt {}, skipping", DebugUtil.docId(coreTransactionGetResult), coreTransactionGetResult.links().stagedAttemptId().get());
                return Mono.empty();
            }
            if (z && coreTransactionGetResult.links().crc32OfStaging().isPresent()) {
                Object obj = (String) coreTransactionGetResult.links().crc32OfStaging().get();
                String str2 = coreTransactionGetResult.crc32OfGet().get();
                coreTransactionLogger.debug(str, "checking whether document {} has changed since staging, crc32 then {} now {}", DebugUtil.docId(coreTransactionGetResult), obj, str2);
                if (!str2.equals(obj)) {
                    coreTransactionLogger.warn(str, "document {} has changed since staging, ignoring it to avoid data loss", DebugUtil.docId(coreTransactionGetResult));
                    return Mono.empty();
                }
            }
            return (Mono) triFunction.apply(collectionIdentifier, coreTransactionGetResult, subdocGetResponse);
        }).onErrorResume(th -> {
            ErrorClass classify = ErrorClass.classify(th);
            coreTransactionLogger.debug(str, "got exception while handling doc {}: {}", DebugUtil.docId(collectionIdentifier, docRecord.id()), DebugUtil.dbg(th));
            if (classify != ErrorClass.FAIL_CAS_MISMATCH) {
                return Mono.error(th);
            }
            coreTransactionLogger.debug(str, "got CAS mismatch while cleaning up doc {}, failing this cleanup attempt (it will be retried)", DebugUtil.docId(collectionIdentifier, docRecord.id()));
            return Mono.error(th);
        });
    }

    private RequestTracer tracer() {
        return this.core.context().environment().requestTracer();
    }

    public Mono<TransactionCleanupAttemptEvent> cleanupATREntry(CollectionIdentifier collectionIdentifier, String str, String str2, ActiveTransactionRecordEntry activeTransactionRecordEntry, boolean z) {
        return performCleanup(CleanupRequest.fromAtrEntry(collectionIdentifier, activeTransactionRecordEntry), z, null);
    }

    public Mono<TransactionCleanupAttemptEvent> performCleanup(CleanupRequest cleanupRequest, boolean z, @Nullable SpanWrapper spanWrapper) {
        SpanWrapper attribute = SpanWrapperUtil.createOp(null, tracer(), cleanupRequest.atrCollection(), cleanupRequest.atrId(), TracingIdentifiers.TRANSACTION_CLEANUP, spanWrapper).attribute(TracingIdentifiers.ATTR_TRANSACTION_ATTEMPT_ID, cleanupRequest.attemptId()).attribute(TracingIdentifiers.ATTR_TRANSACTION_AGE, Long.valueOf(cleanupRequest.ageMillis())).attribute(TracingIdentifiers.ATTR_TRANSACTION_STATE, cleanupRequest.state());
        cleanupRequest.durabilityLevel().ifPresent(durabilityLevel -> {
            attribute.lowCardinalityAttribute(TracingIdentifiers.ATTR_DURABILITY, DurabilityLevelUtil.convertDurabilityLevel(durabilityLevel));
        });
        return Mono.defer(() -> {
            CollectionIdentifier atrCollection = cleanupRequest.atrCollection();
            String atrId = cleanupRequest.atrId();
            String attemptId = cleanupRequest.attemptId();
            CoreTransactionLogger coreTransactionLogger = new CoreTransactionLogger(this.core.context().environment().eventBus(), ActiveTransactionRecordUtil.getAtrDebug(atrCollection, atrId).toString());
            coreTransactionLogger.logDefer(attemptId, "Cleaning up ATR entry (isRegular={}) {}", Event.Severity.DEBUG, Boolean.valueOf(z), cleanupRequest);
            return ForwardCompatibility.check(this.core, ForwardCompatibilityStage.CLEANUP_ENTRY, cleanupRequest.forwardCompatibility(), coreTransactionLogger, this.supported).then(cleanupDocs(coreTransactionLogger, cleanupRequest, attribute)).then(removeATREntry(cleanupRequest.state(), atrCollection, atrId, attemptId, coreTransactionLogger, attribute, cleanupRequest)).then(Mono.fromCallable(() -> {
                TransactionCleanupAttemptEvent transactionCleanupAttemptEvent = new TransactionCleanupAttemptEvent(Event.Severity.DEBUG, true, z, coreTransactionLogger.logs(), attemptId, atrId, atrCollection, cleanupRequest, "");
                this.core.context().environment().eventBus().publish(transactionCleanupAttemptEvent);
                return transactionCleanupAttemptEvent;
            })).onErrorResume(th -> {
                long minutes = TimeUnit.MILLISECONDS.toMinutes(cleanupRequest.ageMillis());
                coreTransactionLogger.logDefer(attemptId, "error while attempting to cleanup ATR entry {}, entry is {} mins old, cleanup will retry later: {}", Event.Severity.WARN, ActiveTransactionRecordUtil.getAtrDebug(atrCollection, atrId), Long.valueOf(minutes), DebugUtil.dbg(th));
                Event.Severity severity = Event.Severity.DEBUG;
                String str = "";
                if (minutes >= 120) {
                    severity = Event.Severity.WARN;
                    str = "despite being " + minutes + " mins old which could indicate a serious error - please raise with support.  Diagnostics: ";
                }
                TransactionCleanupAttemptEvent transactionCleanupAttemptEvent = new TransactionCleanupAttemptEvent(severity, false, z, coreTransactionLogger.logs(), attemptId, atrId, atrCollection, cleanupRequest, str);
                this.core.context().environment().eventBus().publish(transactionCleanupAttemptEvent);
                return Mono.just(transactionCleanupAttemptEvent);
            }).doOnError(th2 -> {
                attribute.finish(th2);
            }).doOnTerminate(() -> {
                attribute.finish();
            });
        });
    }

    Mono<Object> removeATREntry(AttemptState attemptState, CollectionIdentifier collectionIdentifier, String str, String str2, CoreTransactionLogger coreTransactionLogger, SpanWrapper spanWrapper, CleanupRequest cleanupRequest) {
        ArrayList arrayList = new ArrayList();
        if (attemptState == AttemptState.PENDING) {
            arrayList.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, "attempts." + str2 + "." + TransactionFields.ATR_FIELD_COMMIT_ONLY_IF_NOT_ABORTED, new byte[]{48}, false, true, false, 0));
        }
        arrayList.add(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "attempts." + str2, Bytes.EMPTY_BYTE_ARRAY, false, true, false, arrayList.size()));
        return this.hooks.beforeAtrRemove.get().then(TransactionKVHandler.mutateIn(this.core, collectionIdentifier, str, kvDurableTimeout(), false, false, false, false, false, 0L, CodecFlags.BINARY_COMMON_FLAGS, cleanupRequest.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::removeATREntry"), spanWrapper, arrayList)).doOnNext(subdocMutateResponse -> {
            coreTransactionLogger.debug(str2, "successfully removed ATR entry");
        }).onErrorResume(th -> {
            ErrorClass classify = ErrorClass.classify(th);
            coreTransactionLogger.debug(str2, "got exception while removing ATR entry {}: {}", str, DebugUtil.dbg(th));
            if (classify == ErrorClass.FAIL_PATH_NOT_FOUND) {
                coreTransactionLogger.logDefer(str2, "failed to remove {} as entry isn't there, likely due to concurrent cleanup", Event.Severity.DEBUG, ActiveTransactionRecordUtil.getAtrDebug(collectionIdentifier, str));
                return Mono.empty();
            }
            if (classify != ErrorClass.FAIL_PATH_ALREADY_EXISTS) {
                return Mono.error(th);
            }
            coreTransactionLogger.logDefer(str2, "not removing {} as it has changed from PENDING to COMMITTED", Event.Severity.DEBUG, ActiveTransactionRecordUtil.getAtrDebug(collectionIdentifier, str));
            return Mono.error(th);
        }).map(subdocMutateResponse2 -> {
            return subdocMutateResponse2;
        });
    }

    public CleanerHooks hooks() {
        return this.hooks;
    }
}
