package org.apache.ignite.internal.management.cdc;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;

@GridInternal
/* loaded from: input_file:org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.class */
public class CdcCacheDataResendTask extends VisorMultiNodeTask<CdcResendCommandArg, Void, Void> {
    private static final long serialVersionUID = 0;
    private AffinityTopologyVersion topVer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask$CdcCacheDataResendJob.class */
    public static class CdcCacheDataResendJob extends VisorJob<CdcResendCommandArg, Void> {
        private static final long serialVersionUID = 0;

        @LoggerResource
        protected IgniteLogger log;
        private IgniteWriteAheadLogManager wal;
        private GridCachePartitionExchangeManager<Object, Object> exchange;
        private final AffinityTopologyVersion topVer;
        private GridDhtPartitionsExchangeFuture lastFut;

        protected CdcCacheDataResendJob(CdcResendCommandArg cdcResendCommandArg, AffinityTopologyVersion affinityTopologyVersion) {
            super(cdcResendCommandArg, false);
            this.topVer = affinityTopologyVersion;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.ignite.internal.visor.VisorJob
        public Void run(CdcResendCommandArg cdcResendCommandArg) throws IgniteException {
            if (F.isEmpty(cdcResendCommandArg.caches())) {
                throw new IllegalArgumentException("Caches are not specified.");
            }
            ArrayList arrayList = new ArrayList();
            for (String str : cdcResendCommandArg.caches()) {
                IgniteInternalCache cache = this.ignite.context().cache().cache(str);
                if (cache == null) {
                    throw new IgniteException("Cache does not exist [cacheName=" + str + "]");
                }
                if (!cache.context().dataRegion().config().isCdcEnabled()) {
                    throw new IgniteException("CDC is not enabled for given cache [cacheName=" + str + ", dataRegionName=" + cache.context().dataRegion().config().getName() + "]");
                }
                arrayList.add(cache);
            }
            if (this.log.isInfoEnabled()) {
                this.log.info("CDC cache data resend started [caches=" + String.join(", ", cdcResendCommandArg.caches()) + "]");
            }
            this.wal = this.ignite.context().cache().context().wal(true);
            this.exchange = this.ignite.context().cache().context().exchange();
            try {
                Iterator it = arrayList.iterator();
                while (it.hasNext() && !isCancelled()) {
                    resendCacheData((IgniteInternalCache) it.next());
                }
                this.wal.flush(null, true);
                if (this.log.isInfoEnabled()) {
                    this.log.info("CDC cache data resend " + (isCancelled() ? "cancelled" : "finished") + " [caches=" + String.join(", ", cdcResendCommandArg.caches()) + "]");
                }
                return null;
            } catch (IgniteCheckedException e) {
                throw new IgniteException(e);
            }
        }

        /* JADX WARN: Type inference failed for: r6v1, types: [org.apache.ignite.internal.processors.cache.CacheObject] */
        private void resendCacheData(IgniteInternalCache<?, ?> igniteInternalCache) throws IgniteCheckedException {
            if (this.log.isInfoEnabled()) {
                this.log.info("CDC cache data resend started [cacheName=" + igniteInternalCache.name() + "]");
            }
            GridCacheContext<?, ?> context = igniteInternalCache.context();
            GridIterator<CacheDataRow> cacheIterator = context.offheap().cacheIterator(context.cacheId(), true, false, AffinityTopologyVersion.NONE, null);
            long j = 0;
            TreeSet treeSet = new TreeSet();
            for (CacheDataRow cacheDataRow : cacheIterator) {
                if (isCancelled()) {
                    break;
                }
                ensureTopologyNotChanged();
                KeyCacheObject key = cacheDataRow.key();
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Resend key: " + key);
                }
                GridCacheVersion version = cacheDataRow.version();
                if (version instanceof GridCacheVersionEx) {
                    version = new GridCacheVersion(version.topologyVersion(), version.order(), version.nodeOrder(), version.clusterId());
                }
                int cacheId = context.cacheId();
                ?? value = cacheDataRow.value();
                this.wal.log(new CdcDataRecord(new DataEntry(cacheId, key, value, GridCacheOperation.CREATE, null, version, cacheDataRow.expireTime(), key.partition(), -1L, DataEntry.flags(true))));
                treeSet.add(Integer.valueOf(key.partition()));
                long j2 = j + 1;
                j = value;
                if (j2 % 1000 == 0 && this.log.isDebugEnabled()) {
                    this.log.debug("Resend entries count: " + j);
                }
            }
            if (this.log.isInfoEnabled()) {
                if (isCancelled()) {
                    this.log.info("CDC cache data resend cancelled.");
                } else {
                    IgniteLogger igniteLogger = this.log;
                    igniteLogger.info("CDC cache data resend finished [cacheName=" + igniteInternalCache.name() + ", entriesCnt=" + j + ", parts=" + igniteLogger + "]");
                }
            }
        }

        private void ensureTopologyNotChanged() {
            GridDhtPartitionsExchangeFuture lastFinishedFuture = this.exchange.lastFinishedFuture();
            if (this.lastFut != lastFinishedFuture) {
                if (!this.topVer.equals(this.exchange.lastAffinityChangedTopologyVersion(lastFinishedFuture.topologyVersion()))) {
                    throw new IgniteException("CDC cache data resend cancelled. Topology changed during resend [startTopVer=" + this.topVer + ", currentTopVer=" + lastFinishedFuture.topologyVersion() + "]");
                }
                this.lastFut = lastFinishedFuture;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.visor.VisorMultiNodeTask
    public VisorJob<CdcResendCommandArg, Void> job(CdcResendCommandArg cdcResendCommandArg) {
        return new CdcCacheDataResendJob(cdcResendCommandArg, this.topVer);
    }

    @Override // org.apache.ignite.internal.visor.VisorMultiNodeTask
    protected Collection<UUID> jobNodes(VisorTaskArgument<CdcResendCommandArg> visorTaskArgument) {
        GridDhtPartitionsExchangeFuture lastFinishedFuture = this.ignite.context().cache().context().exchange().lastFinishedFuture();
        if (!lastFinishedFuture.rebalanced()) {
            throw new IgniteException("CDC cache data resend cancelled. Rebalance sheduled [topVer=" + lastFinishedFuture.topologyVersion() + "]");
        }
        this.topVer = this.ignite.context().cache().context().exchange().lastAffinityChangedTopologyVersion(lastFinishedFuture.topologyVersion());
        return F.nodeIds(this.ignite.cluster().forServers().nodes());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.ignite.internal.visor.VisorMultiNodeTask
    @Nullable
    protected Void reduce0(List<ComputeJobResult> list) throws IgniteException {
        for (ComputeJobResult computeJobResult : list) {
            if (computeJobResult.getException() != null) {
                throw new IgniteException("CDC cache data resend cancelled. Failed to resend cache data on the node [nodeId=" + computeJobResult.getNode().id() + "]", computeJobResult.getException());
            }
        }
        return null;
    }

    @Override // org.apache.ignite.internal.visor.VisorMultiNodeTask
    @Nullable
    protected /* bridge */ /* synthetic */ Void reduce0(List list) throws IgniteException {
        return reduce0((List<ComputeJobResult>) list);
    }
}
