package org.apache.flink.runtime.iterative.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/* loaded from: input_file:org/apache/flink/runtime/iterative/concurrent/Broker.class */
public class Broker<V> {
    private final ConcurrentMap<String, BlockingQueue<V>> mediations = new ConcurrentHashMap();

    public void handIn(String str, V v) {
        if (!retrieveSharedQueue(str).offer(v)) {
            throw new RuntimeException("Could not register the given element, broker slot is already occupied.");
        }
    }

    public V getAndRemove(String str) {
        try {
            V take = retrieveSharedQueue(str).take();
            this.mediations.remove(str);
            return take;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void remove(String str) {
        this.mediations.remove(str);
    }

    public V get(String str) {
        try {
            BlockingQueue<V> retrieveSharedQueue = retrieveSharedQueue(str);
            V take = retrieveSharedQueue.take();
            if (retrieveSharedQueue.offer(take)) {
                return take;
            }
            throw new RuntimeException("Error: Concurrent modification of the broker slot for key '" + str + "'.");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private BlockingQueue<V> retrieveSharedQueue(String str) {
        BlockingQueue<V> blockingQueue = this.mediations.get(str);
        if (blockingQueue != null) {
            return blockingQueue;
        }
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
        BlockingQueue<V> putIfAbsent = this.mediations.putIfAbsent(str, arrayBlockingQueue);
        return putIfAbsent != null ? putIfAbsent : arrayBlockingQueue;
    }
}
