package org.apache.ignite.internal.processors.datastreamer;

import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.OomExceptionHandler;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.class */
public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
    private Collection<DataStreamerImpl> ldrs;
    private final GridSpinBusyLock busyLock;
    private Thread flusher;
    private final DelayQueue<DataStreamerImpl<K, V>> flushQ;
    private final Marshaller marsh;
    private byte[] marshErrBytes;
    static final /* synthetic */ boolean $assertionsDisabled;

    public DataStreamProcessor(GridKernalContext gridKernalContext) {
        super(gridKernalContext);
        this.ldrs = new GridConcurrentHashSet();
        this.busyLock = new GridSpinBusyLock();
        this.flushQ = new DelayQueue<>();
        if (!gridKernalContext.clientNode()) {
            gridKernalContext.io().addMessageListener(GridTopic.TOPIC_DATASTREAM, new GridMessageListener() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.1
                static final /* synthetic */ boolean $assertionsDisabled;

                @Override // org.apache.ignite.internal.managers.communication.GridMessageListener
                public void onMessage(UUID uuid, Object obj, byte b) {
                    if (!$assertionsDisabled && !(obj instanceof DataStreamerRequest)) {
                        throw new AssertionError();
                    }
                    DataStreamProcessor.this.processRequest(uuid, (DataStreamerRequest) obj);
                }

                static {
                    $assertionsDisabled = !DataStreamProcessor.class.desiredAssertionStatus();
                }
            });
        }
        this.marsh = gridKernalContext.config().getMarshaller();
    }

    @Override // org.apache.ignite.internal.processors.GridProcessorAdapter, org.apache.ignite.internal.GridComponent
    public void start() throws IgniteCheckedException {
        this.marshErrBytes = U.marshal(this.marsh, new IgniteCheckedException("Failed to marshal response error, see node log for details."));
        this.flusher = new IgniteThread(new GridWorker(this.ctx.igniteInstanceName(), "grid-data-loader-flusher", this.log) { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.2
            @Override // org.apache.ignite.internal.util.worker.GridWorker
            protected void body() throws InterruptedException {
                while (!isCancelled()) {
                    DataStreamerImpl<K, V> take = DataStreamProcessor.this.flushQ.take();
                    if (!DataStreamProcessor.this.busyLock.enterBusy()) {
                        return;
                    }
                    try {
                        if (!take.isClosed()) {
                            take.tryFlush();
                            DataStreamProcessor.this.flushQ.offer((DelayQueue<DataStreamerImpl<K, V>>) take);
                        }
                    } finally {
                        DataStreamProcessor.this.busyLock.leaveBusy();
                    }
                }
            }
        });
        this.flusher.setUncaughtExceptionHandler(new OomExceptionHandler(this.ctx));
        this.flusher.start();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Started data streamer processor.");
        }
    }

    @Override // org.apache.ignite.internal.processors.GridProcessorAdapter, org.apache.ignite.internal.GridComponent
    public void onKernalStop(boolean z) {
        if (!this.ctx.clientNode()) {
            this.ctx.io().removeMessageListener(GridTopic.TOPIC_DATASTREAM);
        }
        this.busyLock.block();
        U.interrupt(this.flusher);
        U.join(this.flusher, this.log);
        for (DataStreamerImpl dataStreamerImpl : this.ldrs) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Closing active data streamer on grid stop [ldr=" + dataStreamerImpl + ", cancel=" + z + "]");
            }
            try {
                dataStreamerImpl.closeEx(z);
            } catch (IgniteInterruptedCheckedException e) {
                U.warn(this.log, "Interrupted while waiting for completion of the data streamer: " + dataStreamerImpl, e);
            } catch (IgniteCheckedException e2) {
                U.error(this.log, "Failed to close data streamer: " + dataStreamerImpl, e2);
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Stopped data streamer processor.");
        }
    }

    @Override // org.apache.ignite.internal.processors.GridProcessorAdapter, org.apache.ignite.internal.GridComponent
    public void onDisconnected(IgniteFuture<?> igniteFuture) throws IgniteCheckedException {
        Iterator<DataStreamerImpl> it = this.ldrs.iterator();
        while (it.hasNext()) {
            it.next().onDisconnected(igniteFuture);
        }
    }

    public DataStreamerImpl<K, V> dataStreamer(@Nullable String str) {
        if (!this.busyLock.enterBusy()) {
            throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
        }
        try {
            final DataStreamerImpl<K, V> dataStreamerImpl = new DataStreamerImpl<>(this.ctx, str, this.flushQ);
            this.ldrs.add(dataStreamerImpl);
            dataStreamerImpl.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.3
                static final /* synthetic */ boolean $assertionsDisabled;

                @Override // org.apache.ignite.lang.IgniteInClosure
                public void apply(IgniteInternalFuture<?> igniteInternalFuture) {
                    boolean remove = DataStreamProcessor.this.ldrs.remove(dataStreamerImpl);
                    if (!$assertionsDisabled && !remove) {
                        throw new AssertionError("Loader has not been added to set: " + dataStreamerImpl);
                    }
                    if (DataStreamProcessor.this.log.isDebugEnabled()) {
                        DataStreamProcessor.this.log.debug("Loader has been completed: " + dataStreamerImpl);
                    }
                }

                static {
                    $assertionsDisabled = !DataStreamProcessor.class.desiredAssertionStatus();
                }
            });
            this.busyLock.leaveBusy();
            return dataStreamerImpl;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    private void processRequest(final UUID uuid, final DataStreamerRequest dataStreamerRequest) {
        ClassLoader classLoader;
        if (!this.busyLock.enterBusy()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Ignoring data load request (node is stopping): " + dataStreamerRequest);
                return;
            }
            return;
        }
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Processing data load request: " + dataStreamerRequest);
            }
            AffinityTopologyVersion readyAffinityVersion = this.ctx.cache().context().exchange().readyAffinityVersion();
            AffinityTopologyVersion affinityTopologyVersion = dataStreamerRequest.topologyVersion();
            if (readyAffinityVersion.compareTo(affinityTopologyVersion) < 0) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received request has higher affinity topology version [request=" + dataStreamerRequest + ", locTopVer=" + readyAffinityVersion + ", rmtTopVer=" + affinityTopologyVersion + "]");
                }
                IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture = this.ctx.cache().context().exchange().affinityReadyFuture(affinityTopologyVersion);
                if (affinityReadyFuture != null && !affinityReadyFuture.isDone()) {
                    final byte threadIoPolicy = threadIoPolicy();
                    affinityReadyFuture.listen(new CI1<IgniteInternalFuture<?>>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.4
                        @Override // org.apache.ignite.lang.IgniteInClosure
                        public void apply(IgniteInternalFuture<?> igniteInternalFuture) {
                            DataStreamProcessor.this.ctx.closure().runLocalSafe(new GridPlainRunnable() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.4.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    DataStreamProcessor.this.processRequest(uuid, dataStreamerRequest);
                                }
                            }, threadIoPolicy);
                        }
                    });
                    this.busyLock.leaveBusy();
                    return;
                }
            }
            try {
                Object unmarshal = U.unmarshal(this.marsh, dataStreamerRequest.responseTopicBytes(), U.resolveClassLoader(null, this.ctx.config()));
                if (dataStreamerRequest.forceLocalDeployment()) {
                    classLoader = U.gridClassLoader();
                } else {
                    GridDeployment globalDeployment = this.ctx.deploy().getGlobalDeployment(dataStreamerRequest.deploymentMode(), dataStreamerRequest.sampleClassName(), dataStreamerRequest.sampleClassName(), dataStreamerRequest.userVersion(), uuid, dataStreamerRequest.classLoaderId(), dataStreamerRequest.participants(), null);
                    if (globalDeployment == null) {
                        sendResponse(uuid, unmarshal, dataStreamerRequest.requestId(), new IgniteCheckedException("Failed to get deployment for request [sndId=" + uuid + ", req=" + dataStreamerRequest + "]"), false);
                        this.busyLock.leaveBusy();
                        return;
                    }
                    classLoader = globalDeployment.classLoader();
                }
                try {
                    StreamReceiver<K, V> streamReceiver = (StreamReceiver) U.unmarshal(this.marsh, dataStreamerRequest.updaterBytes(), U.resolveClassLoader(classLoader, this.ctx.config()));
                    if (streamReceiver != null) {
                        this.ctx.resource().injectGeneric(streamReceiver);
                    }
                    localUpdate(uuid, dataStreamerRequest, streamReceiver, unmarshal);
                    this.busyLock.leaveBusy();
                } catch (IgniteCheckedException e) {
                    U.error(this.log, "Failed to unmarshal message [nodeId=" + uuid + ", req=" + dataStreamerRequest + "]", e);
                    sendResponse(uuid, unmarshal, dataStreamerRequest.requestId(), e, false);
                    this.busyLock.leaveBusy();
                }
            } catch (IgniteCheckedException e2) {
                U.error(this.log, "Failed to unmarshal topic from request: " + dataStreamerRequest, e2);
                this.busyLock.leaveBusy();
            }
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    private void localUpdate(final UUID uuid, final DataStreamerRequest dataStreamerRequest, final StreamReceiver<K, V> streamReceiver, final Object obj) {
        boolean z;
        Error error;
        boolean z2 = !(streamReceiver instanceof DataStreamerImpl.IsolatedUpdater);
        try {
            GridCacheAdapter<K, V> internalCache = this.ctx.cache().internalCache(dataStreamerRequest.cacheName());
            if (internalCache == null) {
                throw new IgniteCheckedException("Cache not created or already destroyed: " + dataStreamerRequest.cacheName());
            }
            GridCacheContext<K, V> context = internalCache.context();
            DataStreamerUpdateJob dataStreamerUpdateJob = null;
            GridFutureAdapter gridFutureAdapter = null;
            if (!z2) {
                context.topology().readLock();
            }
            GridDhtTopologyFuture gridDhtTopologyFuture = null;
            ClusterTopologyCheckedException clusterTopologyCheckedException = null;
            AffinityTopologyVersion affinityTopologyVersion = null;
            if (!z2) {
                try {
                    GridDhtTopologyFuture gridDhtTopologyFuture2 = context.topologyVersionFuture();
                    if ((gridDhtTopologyFuture2.isDone() ? gridDhtTopologyFuture2.topologyVersion() : gridDhtTopologyFuture2.initialVersion()).compareTo(dataStreamerRequest.topologyVersion()) > 0) {
                        clusterTopologyCheckedException = new ClusterTopologyCheckedException("DataStreamer will retry data transfer at stable topology [reqTop=" + dataStreamerRequest.topologyVersion() + ", topVer=" + gridDhtTopologyFuture2.initialVersion() + ", node=remote]");
                    } else if (gridDhtTopologyFuture2.isDone()) {
                        affinityTopologyVersion = gridDhtTopologyFuture2.topologyVersion();
                    } else {
                        gridDhtTopologyFuture = gridDhtTopologyFuture2;
                    }
                } catch (Throwable th) {
                    if (!z2) {
                        context.topology().readUnlock();
                    }
                    throw th;
                }
            }
            if (clusterTopologyCheckedException != null) {
                sendResponse(uuid, obj, dataStreamerRequest.requestId(), clusterTopologyCheckedException, dataStreamerRequest.forceLocalDeployment());
                if (z2) {
                    return;
                }
                context.topology().readUnlock();
                return;
            }
            if (gridDhtTopologyFuture == null) {
                dataStreamerUpdateJob = new DataStreamerUpdateJob(this.ctx, this.log, dataStreamerRequest.cacheName(), dataStreamerRequest.entries(), dataStreamerRequest.ignoreDeploymentOwnership(), dataStreamerRequest.skipStore(), dataStreamerRequest.keepBinary(), streamReceiver);
                gridFutureAdapter = z2 ? null : context.mvcc().addDataStreamerFuture(affinityTopologyVersion);
            }
            if (!z2) {
                context.topology().readUnlock();
            }
            if (gridDhtTopologyFuture != null) {
                gridDhtTopologyFuture.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.5
                    @Override // org.apache.ignite.lang.IgniteInClosure
                    public void apply(IgniteInternalFuture<AffinityTopologyVersion> igniteInternalFuture) {
                        DataStreamProcessor.this.localUpdate(uuid, dataStreamerRequest, streamReceiver, obj);
                    }
                });
                return;
            }
            try {
                dataStreamerUpdateJob.call();
                sendResponse(uuid, obj, dataStreamerRequest.requestId(), null, dataStreamerRequest.forceLocalDeployment());
                if (gridFutureAdapter != null) {
                    gridFutureAdapter.onDone();
                }
            } catch (Throwable th2) {
                if (gridFutureAdapter != null) {
                    gridFutureAdapter.onDone();
                }
                throw th2;
            }
        } finally {
            if (z) {
            }
        }
    }

    private void sendResponse(UUID uuid, Object obj, long j, @Nullable Throwable th, boolean z) {
        byte[] bArr;
        byte[] marshal;
        if (th != null) {
            try {
                marshal = U.marshal(this.marsh, th);
            } catch (Exception e) {
                U.error(this.log, "Failed to marshal error [err=" + th + ", marshErr=" + e + "]", e);
                bArr = this.marshErrBytes;
            }
        } else {
            marshal = null;
        }
        bArr = marshal;
        DataStreamerResponse dataStreamerResponse = new DataStreamerResponse(j, bArr, z);
        try {
            this.ctx.io().sendToCustomTopic(uuid, obj, dataStreamerResponse, threadIoPolicy());
        } catch (IgniteCheckedException e2) {
            if (this.ctx.discovery().alive(uuid)) {
                U.error(this.log, "Failed to respond to node [nodeId=" + uuid + ", res=" + dataStreamerResponse + "]", e2);
            } else if (this.log.isDebugEnabled()) {
                this.log.debug("Node has left the grid: " + uuid);
            }
        }
    }

    private static byte threadIoPolicy() {
        Byte currentPolicy = GridIoManager.currentPolicy();
        if (currentPolicy == null) {
            currentPolicy = (byte) 9;
        }
        return currentPolicy.byteValue();
    }

    public static byte ioPolicy(@Nullable IgniteClosure<ClusterNode, Byte> igniteClosure, ClusterNode clusterNode) {
        if (!$assertionsDisabled && clusterNode == null) {
            throw new AssertionError();
        }
        Byte b = null;
        if (igniteClosure != null) {
            b = igniteClosure.apply(clusterNode);
        }
        if (b == null) {
            b = (byte) 9;
        }
        return b.byteValue();
    }

    @Override // org.apache.ignite.internal.processors.GridProcessorAdapter, org.apache.ignite.internal.GridComponent
    public void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> Data streamer processor memory stats [igniteInstanceName=" + this.ctx.igniteInstanceName() + "]", new Object[0]);
        X.println(">>>   ldrsSize: " + this.ldrs.size(), new Object[0]);
    }

    static {
        $assertionsDisabled = !DataStreamProcessor.class.desiredAssertionStatus();
    }
}
