/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.sql;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RollbackExchangeException;
import org.apache.camel.component.sql.DefaultSqlEndpoint;
import org.apache.camel.component.sql.SqlNamedProcessingStrategy;
import org.apache.camel.component.sql.SqlOutputType;
import org.apache.camel.component.sql.SqlPrepareStatementStrategy;
import org.apache.camel.component.sql.SqlProcessingStrategy;
import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.support.JdbcUtils;

public class SqlConsumer
extends ScheduledBatchPollingConsumer {
    private final String query;
    private final JdbcTemplate jdbcTemplate;
    private final NamedParameterJdbcTemplate namedJdbcTemplate;
    private final SqlParameterSource parameterSource;
    private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
    private final SqlProcessingStrategy sqlProcessingStrategy;
    private String onConsume;
    private String onConsumeFailed;
    private String onConsumeBatchComplete;
    private boolean useIterator = true;
    private boolean routeEmptyResultSet;
    private int expectedUpdateCount = -1;
    private boolean breakBatchOnConsumeFail;

    public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query, SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) {
        super((Endpoint)endpoint, processor);
        this.jdbcTemplate = jdbcTemplate;
        this.namedJdbcTemplate = null;
        this.query = query;
        this.parameterSource = null;
        this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
        this.sqlProcessingStrategy = sqlProcessingStrategy;
    }

    public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, NamedParameterJdbcTemplate namedJdbcTemplate, String query, SqlParameterSource parameterSource, SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) {
        super((Endpoint)endpoint, processor);
        this.jdbcTemplate = null;
        this.namedJdbcTemplate = namedJdbcTemplate;
        this.query = query;
        this.parameterSource = parameterSource;
        this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
        this.sqlProcessingStrategy = sqlProcessingStrategy;
    }

    public DefaultSqlEndpoint getEndpoint() {
        return (DefaultSqlEndpoint)super.getEndpoint();
    }

    protected int poll() throws Exception {
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        final String preparedQuery = this.sqlPrepareStatementStrategy.prepareQuery(this.query, this.getEndpoint().isAllowNamedParameters());
        PreparedStatementCallback<Integer> callback = new PreparedStatementCallback<Integer>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Integer doInPreparedStatement(PreparedStatement preparedStatement) throws SQLException, DataAccessException {
                LinkedList answer;
                block8: {
                    answer = new LinkedList();
                    SqlConsumer.this.log.debug("Executing query: {}", (Object)preparedQuery);
                    ResultSet rs = preparedStatement.executeQuery();
                    SqlOutputType outputType = SqlConsumer.this.getEndpoint().getOutputType();
                    try {
                        Object data;
                        SqlConsumer.this.log.trace("Got result list from query: {}, outputType={}", (Object)rs, (Object)outputType);
                        if (outputType == SqlOutputType.SelectList) {
                            data = SqlConsumer.this.getEndpoint().queryForList(rs, true);
                            SqlConsumer.this.addListToQueue(data, answer);
                            break block8;
                        }
                        if (outputType == SqlOutputType.SelectOne) {
                            data = SqlConsumer.this.getEndpoint().queryForObject(rs);
                            if (data != null) {
                                SqlConsumer.this.addListToQueue(data, answer);
                            }
                            break block8;
                        }
                        throw new IllegalArgumentException("Invalid outputType=" + (Object)((Object)outputType));
                    }
                    finally {
                        JdbcUtils.closeResultSet(rs);
                    }
                }
                try {
                    int rows = SqlConsumer.this.processBatch(CastUtils.cast(answer));
                    return rows;
                }
                catch (Exception e) {
                    throw ObjectHelper.wrapRuntimeCamelException((Throwable)e);
                }
            }
        };
        Integer messagePolled = this.namedJdbcTemplate != null ? this.namedJdbcTemplate.execute(preparedQuery, this.parameterSource, callback) : this.jdbcTemplate.execute(preparedQuery, callback);
        return messagePolled;
    }

    private void addListToQueue(Object data, Queue<DataHolder> answer) {
        if (data instanceof List) {
            List list = (List)data;
            if (this.useIterator) {
                for (Object item : list) {
                    this.addItemToQueue(item, answer);
                }
            } else if (!list.isEmpty() || this.routeEmptyResultSet) {
                this.addItemToQueue(list, answer);
            }
        } else {
            this.addItemToQueue(data, answer);
        }
    }

    private void addItemToQueue(Object item, Queue<DataHolder> answer) {
        Exchange exchange = this.createExchange(item);
        DataHolder holder = new DataHolder();
        holder.exchange = exchange;
        holder.data = item;
        answer.add(holder);
    }

    protected Exchange createExchange(Object data) {
        Exchange exchange = this.getEndpoint().createExchange(ExchangePattern.InOnly);
        Message msg = exchange.getIn();
        if (this.getEndpoint().getOutputHeader() != null) {
            msg.setHeader(this.getEndpoint().getOutputHeader(), data);
        } else {
            msg.setBody(data);
        }
        return exchange;
    }

    public int processBatch(Queue<Object> exchanges) throws Exception {
        int total = exchanges.size();
        if (this.maxMessagesPerPoll > 0 && total == this.maxMessagesPerPoll) {
            this.log.debug("Limiting to maximum messages to poll " + this.maxMessagesPerPoll + " as there was more messages in this poll.");
        }
        for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
            DataHolder holder = (DataHolder)ObjectHelper.cast(DataHolder.class, (Object)exchanges.poll());
            Exchange exchange = holder.exchange;
            Object data = holder.data;
            exchange.setProperty("CamelBatchIndex", (Object)index);
            exchange.setProperty("CamelBatchSize", (Object)total);
            exchange.setProperty("CamelBatchComplete", (Object)(index == total - 1 ? 1 : 0));
            this.pendingExchanges = total - index - 1;
            try {
                this.getProcessor().process(exchange);
            }
            catch (Exception e) {
                exchange.setException((Throwable)e);
            }
            if (this.getEndpoint().isTransacted() && exchange.isFailed()) {
                Exception cause = exchange.getException();
                if (cause != null) {
                    throw cause;
                }
                throw new RollbackExchangeException("Rollback transaction due error processing exchange", exchange);
            }
            String sql = exchange.isFailed() ? this.onConsumeFailed : this.onConsume;
            try {
                int updateCount;
                if (data == null || sql == null) continue;
                if (this.namedJdbcTemplate != null && this.sqlProcessingStrategy instanceof SqlNamedProcessingStrategy) {
                    SqlNamedProcessingStrategy namedProcessingStrategy = (SqlNamedProcessingStrategy)this.sqlProcessingStrategy;
                    updateCount = namedProcessingStrategy.commit(this.getEndpoint(), exchange, data, this.namedJdbcTemplate, this.parameterSource, sql);
                } else {
                    updateCount = this.sqlProcessingStrategy.commit(this.getEndpoint(), exchange, data, this.jdbcTemplate, sql);
                }
                if (this.expectedUpdateCount <= -1 || updateCount == this.expectedUpdateCount) continue;
                String msg = "Expected update count " + this.expectedUpdateCount + " but was " + updateCount + " executing query: " + sql;
                throw new SQLException(msg);
            }
            catch (Exception e) {
                if (this.breakBatchOnConsumeFail) {
                    throw e;
                }
                this.handleException("Error executing onConsume/onConsumeFailed query " + sql, e);
            }
        }
        try {
            if (this.onConsumeBatchComplete != null) {
                int updateCount;
                if (this.namedJdbcTemplate != null && this.sqlProcessingStrategy instanceof SqlNamedProcessingStrategy) {
                    SqlNamedProcessingStrategy namedProcessingStrategy = (SqlNamedProcessingStrategy)this.sqlProcessingStrategy;
                    updateCount = namedProcessingStrategy.commitBatchComplete(this.getEndpoint(), this.namedJdbcTemplate, this.parameterSource, this.onConsumeBatchComplete);
                } else {
                    updateCount = this.sqlProcessingStrategy.commitBatchComplete(this.getEndpoint(), this.jdbcTemplate, this.onConsumeBatchComplete);
                }
                this.log.debug("onConsumeBatchComplete update count {}", (Object)updateCount);
            }
        }
        catch (Exception e) {
            if (this.breakBatchOnConsumeFail) {
                throw e;
            }
            this.handleException("Error executing onConsumeBatchComplete query " + this.onConsumeBatchComplete, e);
        }
        return total;
    }

    public String getOnConsume() {
        return this.onConsume;
    }

    public void setOnConsume(String onConsume) {
        this.onConsume = onConsume;
    }

    public String getOnConsumeFailed() {
        return this.onConsumeFailed;
    }

    public void setOnConsumeFailed(String onConsumeFailed) {
        this.onConsumeFailed = onConsumeFailed;
    }

    public String getOnConsumeBatchComplete() {
        return this.onConsumeBatchComplete;
    }

    public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
        this.onConsumeBatchComplete = onConsumeBatchComplete;
    }

    public boolean isUseIterator() {
        return this.useIterator;
    }

    public void setUseIterator(boolean useIterator) {
        this.useIterator = useIterator;
    }

    public boolean isRouteEmptyResultSet() {
        return this.routeEmptyResultSet;
    }

    public void setRouteEmptyResultSet(boolean routeEmptyResultSet) {
        this.routeEmptyResultSet = routeEmptyResultSet;
    }

    public int getExpectedUpdateCount() {
        return this.expectedUpdateCount;
    }

    public void setExpectedUpdateCount(int expectedUpdateCount) {
        this.expectedUpdateCount = expectedUpdateCount;
    }

    public boolean isBreakBatchOnConsumeFail() {
        return this.breakBatchOnConsumeFail;
    }

    public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) {
        this.breakBatchOnConsumeFail = breakBatchOnConsumeFail;
    }

    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
        super.setMaxMessagesPerPoll(maxMessagesPerPoll);
        if (this.jdbcTemplate != null) {
            this.jdbcTemplate.setMaxRows(maxMessagesPerPoll);
        }
    }

    private static final class DataHolder {
        private Exchange exchange;
        private Object data;

        private DataHolder() {
        }
    }
}

