/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.grpc.client;

import io.grpc.stub.StreamObserver;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.grpc.GrpcConfiguration;
import org.apache.camel.component.grpc.GrpcUtils;
import org.apache.camel.component.grpc.client.GrpcExchangeForwarder;

class GrpcStreamingExchangeForwarder
implements GrpcExchangeForwarder {
    private final GrpcConfiguration configuration;
    private final Object grpcStub;
    private volatile StreamObserver<Object> currentStream;
    private volatile StreamObserver<Object> currentResponseObserver;

    public GrpcStreamingExchangeForwarder(GrpcConfiguration configuration, Object grpcStub) {
        this.configuration = configuration;
        this.grpcStub = grpcStub;
    }

    @Override
    public boolean forward(Exchange exchange, StreamObserver<Object> responseObserver, AsyncCallback callback) {
        Message message = exchange.getIn();
        this.checkAndRecreateStreamObserver(responseObserver).onNext(message.getBody());
        callback.done(true);
        return true;
    }

    @Override
    public void forward(Exchange exchange) {
        throw new UnsupportedOperationException("Synchronous call is not supported in streaming mode");
    }

    @Override
    public void shutdown() {
        this.checkAndRecreateStreamObserver(this.currentResponseObserver).onCompleted();
        this.doCloseStream();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private StreamObserver<Object> checkAndRecreateStreamObserver(StreamObserver<Object> responseObserver) {
        StreamObserver<Object> curResponseObserver;
        StreamObserver<Object> curStream = this.currentStream;
        if (curStream == null) {
            GrpcStreamingExchangeForwarder grpcStreamingExchangeForwarder = this;
            synchronized (grpcStreamingExchangeForwarder) {
                if (this.currentStream == null) {
                    this.currentResponseObserver = responseObserver;
                    this.currentStream = this.doCreateStream(responseObserver);
                }
                curStream = this.currentStream;
            }
        }
        if ((curResponseObserver = this.currentResponseObserver) != null && !curResponseObserver.equals(responseObserver)) {
            throw new IllegalArgumentException("This forwarder must always use the same response observer");
        }
        return curStream;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doCloseStream() {
        GrpcStreamingExchangeForwarder grpcStreamingExchangeForwarder = this;
        synchronized (grpcStreamingExchangeForwarder) {
            this.currentStream = null;
            this.currentResponseObserver = null;
        }
    }

    private StreamObserver<Object> doCreateStream(final StreamObserver<Object> streamObserver) {
        return GrpcUtils.invokeAsyncMethodStreaming(this.grpcStub, this.configuration.getMethod(), new StreamObserver<Object>(){

            @Override
            public void onNext(Object o) {
                streamObserver.onNext(o);
            }

            @Override
            public void onError(Throwable throwable) {
                GrpcStreamingExchangeForwarder.this.doCloseStream();
                streamObserver.onError(throwable);
            }

            @Override
            public void onCompleted() {
                GrpcStreamingExchangeForwarder.this.doCloseStream();
                streamObserver.onCompleted();
            }
        });
    }
}

