/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.transaction.cleanup;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.deps.com.fasterxml.jackson.core.JsonProcessingException;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DecodingFailureException;
import com.couchbase.client.core.error.EncodingFailureException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.kv.SubdocCommandType;
import com.couchbase.client.core.msg.kv.SubdocGetRequest;
import com.couchbase.client.core.msg.kv.SubdocGetResponse;
import com.couchbase.client.core.msg.kv.SubdocMutateRequest;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.transaction.CoreTransactionsReactive;
import com.couchbase.client.core.transaction.cleanup.AccessErrorException;
import com.couchbase.client.core.transaction.cleanup.ClientRecordDetails;
import com.couchbase.client.core.transaction.cleanup.CoreTransactionsCleanup;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecord;
import com.couchbase.client.core.transaction.config.CoreTransactionsConfig;
import com.couchbase.client.core.transaction.error.internal.ErrorClass;
import com.couchbase.client.core.transaction.log.SimpleEventBusLogger;
import com.couchbase.client.core.transaction.support.OptionsUtil;
import com.couchbase.client.core.transaction.support.SpanWrapper;
import com.couchbase.client.core.transaction.support.SpanWrapperUtil;
import com.couchbase.client.core.transaction.util.DebugUtil;
import com.couchbase.client.core.transaction.util.TransactionKVHandler;
import com.couchbase.client.core.util.Bytes;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

@Stability.Internal
public class ClientRecord {
    private final Core core;
    private final SimpleEventBusLogger LOGGER;
    public static String CLIENT_RECORD_DOC_ID = "_txn:client-record";
    private static final String FIELD_HEARTBEAT = "heartbeat_ms";
    private static final String FIELD_EXPIRES = "expires_ms";
    private static final String FIELD_NUM_ATRS = "num_atrs";
    private static final String FIELD_HOST = "host";
    private static final String FIELD_IMPLEMENTATION = "implementation";
    private static final String FIELD_VERSION = "version";
    private static final String FIELD_PROCESS_ID = "process_id";
    public static final String FIELD_RECORDS = "records";
    public static final String FIELD_CLIENTS = "clients";
    public static final String FIELD_OVERRIDE = "override";
    public static final String FIELD_OVERRIDE_ENABLED = "enabled";
    public static final String FIELD_OVERRIDE_EXPIRES = "expires";
    private static final int SAFETY_MARGIN_EXPIRY_MILLIS = 20000;
    private static final Duration TIMEOUT = Duration.ofMillis(500L);
    private static final Duration BACKOFF_START = Duration.ofMillis(10L);
    private static final Duration BACKOFF_END = Duration.ofMillis(250L);

    public ClientRecord(Core core) {
        this.LOGGER = new SimpleEventBusLogger(core.context().environment().eventBus(), CoreTransactionsCleanup.CATEGORY_CLIENT_RECORD);
        this.core = Objects.requireNonNull(core);
    }

    public Flux<Void> removeClientFromClientRecord(String clientUuid, Set<CollectionIdentifier> cleanupSet) {
        return this.removeClientFromClientRecord(clientUuid, TIMEOUT, cleanupSet);
    }

    public Flux<Void> removeClientFromClientRecord(String clientUuid, Duration timeout, Set<CollectionIdentifier> collections) {
        return Flux.fromIterable(collections).subscribeOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup()).doOnNext(v -> this.LOGGER.info("{} removing from client record on collection {}", clientUuid, RedactableArgument.redactUser(v))).concatMap(collection -> this.beforeRemoveClient(this).then(TransactionKVHandler.mutateIn(this.core, collection, CLIENT_RECORD_DOC_ID, this.mutatingTimeout(), false, false, false, false, false, 0L, Optional.empty(), OptionsUtil.createClientContext("Cleaner::removeClientFromCleanupSet"), null, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "records.clients." + clientUuid, Bytes.EMPTY_BYTE_ARRAY, false, true, false, 0)))).onErrorResume(err -> {
            switch (ErrorClass.classify(err)) {
                case FAIL_DOC_NOT_FOUND: {
                    this.LOGGER.info(String.format("%s/%s remove skipped as client record does not exist", RedactableArgument.redactUser(collection), clientUuid), new Object[0]);
                    return Mono.empty();
                }
                case FAIL_PATH_NOT_FOUND: {
                    this.LOGGER.info(String.format("%s/%s remove skipped as client record entry does not exist", RedactableArgument.redactUser(collection), clientUuid), new Object[0]);
                    return Mono.empty();
                }
            }
            this.LOGGER.info(String.format("%s/%s got error while removing client from client record: %s", RedactableArgument.redactUser(collection), clientUuid, DebugUtil.dbg(err)), new Object[0]);
            return Mono.error(err);
        }).retryWhen(Retry.any().exponentialBackoff(BACKOFF_START, BACKOFF_END).doOnRetry(v -> this.LOGGER.info(String.format("%s/%s retrying removing client from record on error %s", RedactableArgument.redactUser(collection), clientUuid, DebugUtil.dbg(v.exception())), new Object[0])).toReactorRetry()).timeout(timeout).doOnNext(v -> this.LOGGER.info(String.format("%s/%s removed from client record", RedactableArgument.redactUser(collection), clientUuid), new Object[0])).doOnError(err -> this.LOGGER.info(String.format("got error while removing client record '%s'", err), new Object[0])).then());
    }

    private Duration mutatingTimeout() {
        return this.core.context().environment().timeoutConfig().kvDurableTimeout();
    }

    private Duration nonMutatingTimeout() {
        return this.core.context().environment().timeoutConfig().kvTimeout();
    }

    public static ClientRecordDetails parseClientRecord(SubdocGetResponse clientRecord, String clientUuid) {
        try {
            JsonNode records = Mapper.reader().readValue(clientRecord.values()[0].value(), JsonNode.class);
            JsonNode hlcRaw = Mapper.reader().readValue(clientRecord.values()[1].value(), JsonNode.class);
            ActiveTransactionRecord.ParsedHLC parsedHLC = new ActiveTransactionRecord.ParsedHLC(hlcRaw);
            JsonNode clients = records.get(FIELD_CLIENTS);
            ArrayList<String> expiredClientIds = new ArrayList<String>();
            ArrayList<String> activeClientIds = new ArrayList<String>();
            Iterator<String> iterator = clients.fieldNames();
            while (iterator.hasNext()) {
                boolean out;
                int expiresMsecs;
                long heartbeatMillis;
                String otherClientId = iterator.next();
                JsonNode cl = clients.get(otherClientId);
                long casMillis = parsedHLC.nowInNanos() / 1000000L;
                long expiredPeriod = casMillis - (heartbeatMillis = ActiveTransactionRecord.parseMutationCAS(cl.get(FIELD_HEARTBEAT).textValue()));
                boolean hasExpired = expiredPeriod >= (long)(expiresMsecs = cl.get(FIELD_EXPIRES).intValue());
                boolean bl = out = hasExpired && !otherClientId.equals(clientUuid);
                if (out) {
                    expiredClientIds.add(otherClientId);
                    continue;
                }
                activeClientIds.add(otherClientId);
            }
            if (!activeClientIds.contains(clientUuid)) {
                activeClientIds.add(clientUuid);
            }
            List sortedActiveClientIds = activeClientIds.stream().sorted().collect(Collectors.toList());
            int indexOfThisClient = sortedActiveClientIds.indexOf(clientUuid);
            int numExpiredClients = expiredClientIds.size();
            int numActiveClients = sortedActiveClientIds.size();
            int numExistingClients = numExpiredClients + numActiveClients;
            boolean alreadyContainsClient = clients.has(clientUuid);
            boolean overrideEnabled = false;
            long overrideExpiresCas = 0L;
            JsonNode override = records.get(FIELD_OVERRIDE);
            if (override != null) {
                overrideEnabled = override.get(FIELD_OVERRIDE_ENABLED).asBoolean();
                overrideExpiresCas = override.get(FIELD_OVERRIDE_EXPIRES).asLong();
            }
            return new ClientRecordDetails(numActiveClients, indexOfThisClient, !alreadyContainsClient, expiredClientIds, numExistingClients, numExpiredClients, overrideEnabled, overrideExpiresCas, parsedHLC.nowInNanos());
        }
        catch (IOException e) {
            throw new DecodingFailureException(e);
        }
    }

    public Mono<SubdocGetResponse> getClientRecord(CollectionIdentifier collection, @Nullable SpanWrapper span) {
        return TransactionKVHandler.lookupIn(this.core, collection, CLIENT_RECORD_DOC_ID, this.nonMutatingTimeout(), false, OptionsUtil.createClientContext("ClientRecord::getClientRecord"), span, Arrays.asList(new SubdocGetRequest.Command(SubdocCommandType.GET, FIELD_RECORDS, true, 0), new SubdocGetRequest.Command(SubdocCommandType.GET, "$vbucket.HLC", true, 1)));
    }

    private RequestTracer tracer() {
        return this.core.context().environment().requestTracer();
    }

    public Mono<ClientRecordDetails> processClient(String clientUuid, CollectionIdentifier collection, CoreTransactionsConfig config, @Nullable SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(null, this.tracer(), collection, CLIENT_RECORD_DOC_ID, "transaction_cleanup_client", pspan).attribute("db.couchbase.transactions.cleanup.client_id", clientUuid);
            String bp = collection.bucket() + "/" + collection.scope().orElse("-") + "/" + collection.collection().orElse("-") + "/" + clientUuid;
            return this.beforeGetRecord(this).then(this.getClientRecord(collection, span)).flatMap(clientRecord -> {
                ClientRecordDetails cr = ClientRecord.parseClientRecord(clientRecord, clientUuid);
                this.LOGGER.debug(String.format("%s found %d existing clients including this (%s active, %d expired), included this=%s, index of this=%d, override={enabled=%s,expires=%d,now=%d,active=%s}", bp, cr.numExistingClients(), cr.numActiveClients(), cr.numExpiredClients(), !cr.clientIsNew(), cr.indexOfThisClient(), cr.overrideEnabled(), cr.overrideExpires(), cr.casNow(), cr.overrideActive()), new Object[0]);
                if (cr.overrideActive()) {
                    return Mono.just(cr);
                }
                ArrayList<SubdocMutateRequest.Command> specs = new ArrayList<SubdocMutateRequest.Command>();
                String field = "records.clients." + clientUuid;
                String host = "unavailable";
                try {
                    host = InetAddress.getLocalHost().getHostAddress();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
                long pid = 0L;
                String name = ManagementFactory.getRuntimeMXBean().getName();
                try {
                    pid = Long.parseLong(name.split("@")[0]);
                }
                catch (Throwable err) {
                    this.LOGGER.debug(String.format("Discarding error %s while trying to parse PID %s", err.getMessage(), name), new Object[0]);
                }
                byte[] toWrite = Mapper.encodeAsBytes(Mapper.createObjectNode().put(FIELD_EXPIRES, config.cleanupConfig().cleanupWindow().toMillis() + 20000L).put(FIELD_NUM_ATRS, config.numAtrs()).put(FIELD_IMPLEMENTATION, "java").put(FIELD_VERSION, CoreTransactionsReactive.class.getPackage().getImplementationVersion()).put(FIELD_HOST, host).put(FIELD_PROCESS_ID, pid));
                specs.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, field, toWrite, true, true, false, 0));
                try {
                    specs.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, field + "." + FIELD_HEARTBEAT, Mapper.writer().writeValueAsBytes("${Mutation.CAS}"), false, true, true, 1));
                }
                catch (JsonProcessingException e) {
                    throw new EncodingFailureException(e);
                }
                cr.expiredClientIds().stream().limit(16 - specs.size() - 1).forEach(expiredClientId -> {
                    this.LOGGER.debug(String.format("%s removing expired client %s", bp, expiredClientId), new Object[0]);
                    specs.add(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "records.clients." + expiredClientId, null, false, true, false, specs.size()));
                });
                specs.add(new SubdocMutateRequest.Command(SubdocCommandType.SET_DOC, "", new byte[]{0}, false, false, false, specs.size()));
                return this.beforeUpdateRecord(this).then(TransactionKVHandler.mutateIn(this.core, collection, CLIENT_RECORD_DOC_ID, this.mutatingTimeout(), false, false, false, false, false, 0L, Optional.empty(), OptionsUtil.createClientContext("ClientRecord::processClient"), span, specs)).thenReturn(cr);
            }).onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                this.LOGGER.debug(String.format("%s got error processing client record: %s", bp, DebugUtil.dbg(err)), new Object[0]);
                if (ec == ErrorClass.FAIL_DOC_NOT_FOUND) {
                    return this.createClientRecord(clientUuid, collection, span).then(this.processClient(clientUuid, collection, config, pspan));
                }
                if (err instanceof CouchbaseException && ((CouchbaseException)err).context() != null && ((CouchbaseException)err).context().responseStatus() == ResponseStatus.NO_ACCESS) {
                    return Mono.error(new AccessErrorException());
                }
                return Mono.error(err);
            }).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        });
    }

    private Mono<Void> createClientRecord(String clientUuid, CollectionIdentifier collection, SpanWrapper pspan) {
        String bp = collection.bucket() + "/" + collection.scope().orElse("-") + "/" + collection.collection().orElse("-") + "/" + clientUuid;
        return this.beforeCreateRecord(this).then(TransactionKVHandler.mutateIn(this.core, collection, CLIENT_RECORD_DOC_ID, this.mutatingTimeout(), true, false, false, false, false, 0L, Optional.empty(), OptionsUtil.createClientContext("ClientRecord::createClientRecord"), pspan, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, "records.clients", "{}".getBytes(StandardCharsets.UTF_8), false, true, false, 0), new SubdocMutateRequest.Command(SubdocCommandType.SET_DOC, "", new byte[]{0}, false, false, false, 1)))).doOnSubscribe(v -> this.LOGGER.debug(String.format("%s found client record does not exist, creating and retrying", bp), new Object[0])).onErrorResume(e -> {
            if (ErrorClass.FAIL_DOC_ALREADY_EXISTS == ErrorClass.classify(e)) {
                this.LOGGER.debug(String.format("%s found client record exists after retry, another client must have created it, continuing", bp), new Object[0]);
                return Mono.empty();
            }
            if (e instanceof CouchbaseException && ((CouchbaseException)e).context().responseStatus() == ResponseStatus.NO_ACCESS) {
                return Mono.error(new AccessErrorException());
            }
            this.LOGGER.info(String.format("got error while creating client record '%s'", e), new Object[0]);
            return Mono.error(e);
        }).then();
    }

    protected Mono<Integer> beforeCreateRecord(ClientRecord self) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeRemoveClient(ClientRecord self) {
        return Mono.just(1);
    }

    @Deprecated
    protected Mono<Integer> beforeUpdateCAS(ClientRecord self) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeGetRecord(ClientRecord self) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeUpdateRecord(ClientRecord self) {
        return Mono.just(1);
    }
}

