/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.pgevent;

import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
import java.sql.PreparedStatement;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.pgevent.PgEventEndpoint;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PgEventConsumer
extends DefaultConsumer
implements PGNotificationListener {
    private static final Logger LOG = LoggerFactory.getLogger(PgEventConsumer.class);
    private final PgEventEndpoint endpoint;
    private PGConnection dbConnection;

    public PgEventConsumer(PgEventEndpoint endpoint, Processor processor) {
        super(endpoint, processor);
        this.endpoint = endpoint;
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        this.dbConnection = this.endpoint.initJdbc();
        String sql = String.format("LISTEN %s", this.endpoint.getChannel());
        try (PreparedStatement statement = this.dbConnection.prepareStatement(sql);){
            statement.execute();
        }
        this.dbConnection.addNotificationListener(this.endpoint.getChannel(), this.endpoint.getChannel(), this);
    }

    @Override
    public void notification(int processId, String channel, String payload) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Notification processId: {}, channel: {}, payload: {}", new Object[]{processId, channel, payload});
        }
        Exchange exchange = this.createExchange(false);
        Message msg = exchange.getIn();
        msg.setHeader("channel", channel);
        msg.setBody(payload);
        try {
            this.getProcessor().process(exchange);
        }
        catch (Exception e) {
            exchange.setException(e);
        }
        if (exchange.getException() != null) {
            String cause = "Unable to process incoming notification from PostgreSQL: processId='" + processId + "', channel='" + channel + "', payload='" + payload + "'";
            this.getExceptionHandler().handleException(cause, exchange.getException());
        }
        this.releaseExchange(exchange, false);
    }

    @Override
    protected void doStop() throws Exception {
        if (this.dbConnection != null) {
            this.dbConnection.removeNotificationListener(this.endpoint.getChannel());
            String sql = String.format("UNLISTEN %s", this.endpoint.getChannel());
            try (PreparedStatement statement = this.dbConnection.prepareStatement(sql);){
                statement.execute();
            }
            this.dbConnection.close();
        }
    }
}

