/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.reactive.streams;

import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsEndpoint;
import org.apache.camel.component.reactive.streams.ReactiveStreamsHelper;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.ObjectHelper;

public class ReactiveStreamsProducer
extends DefaultAsyncProducer {
    private final ReactiveStreamsEndpoint endpoint;
    private final String name;
    private final CamelReactiveStreamsService service;

    public ReactiveStreamsProducer(ReactiveStreamsEndpoint endpoint, String name, CamelReactiveStreamsService service) {
        super(endpoint);
        this.endpoint = endpoint;
        this.name = ObjectHelper.notNull(name, "name");
        this.service = ObjectHelper.notNull(service, "service");
    }

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        ReactiveStreamsHelper.attachCallback(exchange, (data, error) -> {
            if (error != null) {
                data.setException(error);
            }
            callback.done(false);
        });
        this.service.sendCamelExchange(this.name, exchange);
        return false;
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        this.service.attachCamelProducer(this.endpoint.getStream(), this);
    }

    @Override
    protected void doStop() throws Exception {
        super.doStop();
        this.service.detachCamelProducer(this.endpoint.getStream());
    }

    @Override
    public ReactiveStreamsEndpoint getEndpoint() {
        return this.endpoint;
    }
}

