package com.hazelcast.jet.impl.connector;

import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.impl.spi.ClientPartitionService;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.JetDataSerializerHook;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.util.ImdgUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.IMap;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.partition.PartitioningStrategy;
import com.hazelcast.query.impl.QueryableEntry;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.ToIntFunction;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/connector/AbstractUpdateMapP.class */
public abstract class AbstractUpdateMapP<T, K, V> extends AsyncHazelcastWriterP {
    private static final int PENDING_ITEM_COUNT_LIMIT = 1024;
    protected final FunctionEx<? super T, ? extends K> keyFn;
    protected final String mapName;
    protected IMap<K, V> map;
    protected SerializationContext<K> serializationContext;
    protected Map<Data, Object>[] partitionBuffers;
    protected int[] pendingInPartition;
    protected int pendingItemCount;
    protected int currentPartitionId;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/AbstractUpdateMapP$ApplyValuesEntryProcessor.class */
    public static class ApplyValuesEntryProcessor<K, V> implements EntryProcessor<K, V, Void>, IdentifiedDataSerializable {
        private Map<Data, Object> keysToUpdate;

        public ApplyValuesEntryProcessor() {
        }

        public ApplyValuesEntryProcessor(Map<Data, Object> map) {
            this.keysToUpdate = map;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        /* JADX WARN: Multi-variable type inference failed */
        @Override // com.hazelcast.map.EntryProcessor
        public Void process(Map.Entry<K, V> entry) {
            QueryableEntry queryableEntry = (QueryableEntry) entry;
            queryableEntry.setValue(this.keysToUpdate.get(queryableEntry.getKeyData()));
            return null;
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void writeData(ObjectDataOutput objectDataOutput) throws IOException {
            objectDataOutput.writeInt(this.keysToUpdate.size());
            for (Map.Entry<Data, Object> entry : this.keysToUpdate.entrySet()) {
                IOUtil.writeData(objectDataOutput, entry.getKey());
                IOUtil.writeData(objectDataOutput, (Data) entry.getValue());
            }
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void readData(ObjectDataInput objectDataInput) throws IOException {
            int readInt = objectDataInput.readInt();
            this.keysToUpdate = new LinkedHashMap(readInt);
            for (int i = 0; i < readInt; i++) {
                this.keysToUpdate.put(IOUtil.readData(objectDataInput), IOUtil.readData(objectDataInput));
            }
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getFactoryId() {
            return JetDataSerializerHook.FACTORY_ID;
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getClassId() {
            return 4;
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/AbstractUpdateMapP$SerializationContext.class */
    public static class SerializationContext<K> {
        private final int partitionCount;
        private final ToIntFunction<Data> partitionIdFn;
        private final SerializationService serializationService;
        private final PartitioningStrategy<K> partitioningStrategy;

        SerializationContext(HazelcastInstance hazelcastInstance, IMap<K, ?> iMap) {
            if (ImdgUtil.isMemberInstance(hazelcastInstance)) {
                NodeEngineImpl nodeEngine = Util.getNodeEngine(hazelcastInstance);
                InternalPartitionService partitionService = nodeEngine.getPartitionService();
                this.partitionCount = partitionService.getPartitionCount();
                Objects.requireNonNull(partitionService);
                this.partitionIdFn = partitionService::getPartitionId;
                this.serializationService = nodeEngine.getSerializationService();
                this.partitioningStrategy = ((MapProxyImpl) iMap).getPartitionStrategy();
                return;
            }
            HazelcastClientProxy hazelcastClientProxy = (HazelcastClientProxy) hazelcastInstance;
            ClientPartitionService clientPartitionService = hazelcastClientProxy.client.getClientPartitionService();
            this.partitionCount = clientPartitionService.getPartitionCount();
            Objects.requireNonNull(clientPartitionService);
            this.partitionIdFn = clientPartitionService::getPartitionId;
            this.serializationService = hazelcastClientProxy.getSerializationService();
            this.partitioningStrategy = null;
        }

        public int partitionCount() {
            return this.partitionCount;
        }

        public int partitionId(Data data) {
            return this.partitionIdFn.applyAsInt(data);
        }

        public Data toKeyData(K k) {
            return this.partitioningStrategy != null ? this.serializationService.toData(k, this.partitioningStrategy) : this.serializationService.toData(k);
        }

        public Data toData(Object obj) {
            return this.serializationService.toData(obj);
        }
    }

    public AbstractUpdateMapP(@Nonnull HazelcastInstance hazelcastInstance, int i, @Nonnull String str, @Nonnull FunctionEx<? super T, ? extends K> functionEx) {
        super(hazelcastInstance, i);
        this.mapName = (String) Objects.requireNonNull(str, "mapName");
        this.keyFn = functionEx;
    }

    @Override // com.hazelcast.jet.core.Processor
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.map = instance().getMap(this.mapName);
        this.serializationContext = new SerializationContext<>(instance(), this.map);
        int partitionCount = this.serializationContext.partitionCount();
        this.partitionBuffers = new Map[partitionCount];
        this.pendingInPartition = new int[partitionCount];
        for (int i = 0; i < partitionCount; i++) {
            this.partitionBuffers[i] = new HashMap();
        }
    }

    @Override // com.hazelcast.jet.impl.connector.AsyncHazelcastWriterP
    protected final void processInternal(Inbox inbox) {
        if (this.pendingItemCount < 1024) {
            this.pendingItemCount += inbox.size();
            inbox.drain(this::addToBuffer);
        }
        submitPending();
    }

    protected abstract void addToBuffer(T t);

    @Override // com.hazelcast.jet.impl.connector.AsyncHazelcastWriterP
    protected final boolean flushInternal() {
        return submitPending();
    }

    private boolean submitPending() {
        if (this.pendingItemCount == 0) {
            return true;
        }
        int i = 0;
        while (i < this.partitionBuffers.length) {
            if (!this.partitionBuffers[this.currentPartitionId].isEmpty()) {
                if (!tryAcquirePermit()) {
                    return false;
                }
                Map<Data, Object> map = this.partitionBuffers[this.currentPartitionId];
                setCallback(this.map.submitToKeys(map.keySet(), entryProcessor(map)));
                this.pendingItemCount -= this.pendingInPartition[this.currentPartitionId];
                this.pendingInPartition[this.currentPartitionId] = 0;
                this.partitionBuffers[this.currentPartitionId] = new HashMap();
            }
            i++;
            this.currentPartitionId = incrCircular(this.currentPartitionId, this.partitionBuffers.length);
        }
        if (this.currentPartitionId == this.partitionBuffers.length) {
            this.currentPartitionId = 0;
        }
        if ($assertionsDisabled || this.pendingItemCount == 0) {
            return true;
        }
        throw new AssertionError("pending item count should be 0, but was " + this.pendingItemCount);
    }

    protected abstract EntryProcessor<K, V, Void> entryProcessor(Map<Data, Object> map);

    private static int incrCircular(int i, int i2) {
        int i3 = i + 1;
        if (i3 == i2) {
            i3 = 0;
        }
        return i3;
    }

    static {
        $assertionsDisabled = !AbstractUpdateMapP.class.desiredAssertionStatus();
    }
}
