/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.grpc.server;

import io.grpc.stub.StreamObserver;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.camel.component.grpc.GrpcConsumer;
import org.apache.camel.component.grpc.GrpcEndpoint;
import org.apache.camel.component.grpc.server.GrpcRequestAbstractStreamObserver;

public class GrpcRequestAggregationStreamObserver
extends GrpcRequestAbstractStreamObserver {
    private List<Object> requestList = new LinkedList<Object>();

    public GrpcRequestAggregationStreamObserver(GrpcEndpoint endpoint, GrpcConsumer consumer, StreamObserver<Object> responseObserver, Map<String, Object> headers) {
        super(endpoint, consumer, responseObserver, headers);
        this.exchange = endpoint.createExchange();
    }

    @Override
    public void onNext(Object request) {
        this.requestList.add(request);
    }

    @Override
    public void onError(Throwable t2) {
        this.exchange.setException(t2);
    }

    @Override
    public void onCompleted() {
        CountDownLatch latch = new CountDownLatch(1);
        this.exchange.getIn().setBody(this.requestList);
        this.exchange.getIn().setHeaders(this.headers);
        this.consumer.process(this.exchange, doneSync -> latch.countDown());
        try {
            latch.await();
            Object responseBody = this.exchange.getMessage().getBody();
            if (responseBody instanceof List) {
                List responseList = (List)responseBody;
                responseList.forEach(this.responseObserver::onNext);
            } else {
                this.responseObserver.onNext(responseBody);
            }
            this.responseObserver.onCompleted();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.responseObserver.onError(e);
        }
    }
}

