/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.aggregate.cassandra;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.cassandra.CassandraAggregationException;
import org.apache.camel.processor.aggregate.cassandra.CassandraCamelCodec;
import org.apache.camel.spi.Configurer;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.utils.cassandra.CassandraSessionHolder;
import org.apache.camel.utils.cassandra.CassandraUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Metadata(label="bean", description="Aggregation repository that uses Cassandra table to store exchanges. Advice: use LeveledCompaction for this table and tune read/write consistency levels.", annotations={"interfaceName=org.apache.camel.spi.AggregationRepository"})
@Configurer(metadataOnly=true)
public class CassandraAggregationRepository
extends ServiceSupport
implements RecoverableAggregationRepository {
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraAggregationRepository.class);
    private final CassandraCamelCodec exchangeCodec = new CassandraCamelCodec();
    @Metadata(description="Cassandra session", required=true)
    private CassandraSessionHolder sessionHolder;
    @Metadata(description="The table name for storing the data", defaultValue="CAMEL_AGGREGATION")
    private String table = "CAMEL_AGGREGATION";
    @Metadata(description="Column name for Exchange ID", defaultValue="EXCHANGE_ID")
    private String exchangeIdColumn = "EXCHANGE_ID";
    @Metadata(description="Column name for Exchange", defaultValue="EXCHANGE")
    private String exchangeColumn = "EXCHANGE";
    @Metadata(description="Values used as primary key prefix. Multiple values can be separated by comma.", displayName="Prefix Primary Key Values", javaType="java.lang.String")
    private Object[] prefixPKValues = new Object[0];
    @Metadata(description="Primary key columns. Multiple values can be separated by comma.", displayName="Primary Key Columns", javaType="java.lang.String", defaultValue="KEY")
    private String[] pkColumns = new String[]{"KEY"};
    @Metadata(description="Time to live in seconds used for inserts", displayName="Time to Live")
    private Integer ttl;
    @Metadata(description="Write consistency level", enums="ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_ONE,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL")
    private ConsistencyLevel writeConsistencyLevel;
    @Metadata(description="Read consistency level", enums="ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_ONE,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL")
    private ConsistencyLevel readConsistencyLevel;
    private PreparedStatement insertStatement;
    private PreparedStatement selectStatement;
    private PreparedStatement deleteStatement;
    private PreparedStatement selectKeyIdStatement;
    private PreparedStatement deleteIfIdStatement;
    @Metadata(description="Sets the interval between recovery scans", defaultValue="5000")
    private long recoveryInterval = 5000L;
    @Metadata(description="Whether or not recovery is enabled", defaultValue="true")
    private boolean useRecovery = true;
    @Metadata(description="Sets an optional dead letter channel which exhausted recovered Exchange should be send to.")
    private String deadLetterUri;
    @Metadata(description="Sets an optional limit of the number of redelivery attempt of recovered Exchange should be attempted, before its exhausted. When this limit is hit, then the Exchange is moved to the dead letter channel.")
    private int maximumRedeliveries;
    @Metadata(label="advanced", description="Whether headers on the Exchange that are Java objects and Serializable should be included and saved to the repository")
    private boolean allowSerializedHeaders;
    private String deserializationFilter = "java.**;org.apache.camel.**;!*";

    public CassandraAggregationRepository() {
    }

    public CassandraAggregationRepository(CqlSession session) {
        this.sessionHolder = new CassandraSessionHolder(session);
    }

    protected Object[] getPKValues(String key) {
        return CassandraUtils.append(this.prefixPKValues, key);
    }

    private String getKeyColumn() {
        return this.pkColumns[this.pkColumns.length - 1];
    }

    private String[] getAllColumns() {
        return CassandraUtils.append(this.pkColumns, this.exchangeIdColumn, this.exchangeColumn);
    }

    @Override
    protected void doStart() throws Exception {
        this.sessionHolder.start();
        this.initInsertStatement();
        this.initSelectStatement();
        this.initDeleteStatement();
        this.initSelectKeyIdStatement();
        this.initDeleteIfIdStatement();
    }

    @Override
    protected void doStop() throws Exception {
        this.sessionHolder.stop();
    }

    private void initInsertStatement() {
        Insert insert = CassandraUtils.generateInsert(this.table, this.getAllColumns(), false, this.ttl);
        SimpleStatement statement = CassandraUtils.applyConsistencyLevel(insert.build(), this.writeConsistencyLevel);
        LOGGER.debug("Generated Insert {}", (Object)statement);
        this.insertStatement = this.getSession().prepare(statement);
    }

    @Override
    public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
        Object[] idValues = this.getPKValues(key);
        LOGGER.debug("Inserting key {} exchange {}", (Object)idValues, (Object)exchange);
        try {
            ByteBuffer marshalledExchange = this.exchangeCodec.marshallExchange(exchange, this.allowSerializedHeaders);
            Object[] cqlParams = CassandraUtils.concat(idValues, new Object[]{exchange.getExchangeId(), marshalledExchange});
            this.getSession().execute(this.insertStatement.bind(cqlParams));
            return exchange;
        }
        catch (IOException iOException) {
            throw new CassandraAggregationException("Failed to write exchange", exchange, iOException);
        }
    }

    protected void initSelectStatement() {
        Select select = CassandraUtils.generateSelect(this.table, this.getAllColumns(), this.pkColumns);
        SimpleStatement statement = CassandraUtils.applyConsistencyLevel(select.build(), this.readConsistencyLevel);
        LOGGER.debug("Generated Select {}", (Object)statement);
        this.selectStatement = this.getSession().prepare(statement);
    }

    @Override
    public Exchange get(CamelContext camelContext, String key) {
        Object[] pkValues = this.getPKValues(key);
        LOGGER.debug("Selecting key {}", pkValues);
        Row row = (Row)this.getSession().execute(this.selectStatement.bind(pkValues)).one();
        Exchange exchange = null;
        if (row != null) {
            try {
                exchange = this.exchangeCodec.unmarshallExchange(camelContext, row.getByteBuffer(this.exchangeColumn), this.deserializationFilter);
            }
            catch (IOException iOException) {
                throw new CassandraAggregationException("Failed to read exchange", exchange, iOException);
            }
            catch (ClassNotFoundException classNotFoundException) {
                throw new CassandraAggregationException("Failed to read exchange", exchange, classNotFoundException);
            }
        }
        return exchange;
    }

    private void initDeleteIfIdStatement() {
        Delete delete = CassandraUtils.generateDelete(this.table, this.pkColumns, false);
        Delete deleteIf = (Delete)delete.ifColumn(this.exchangeIdColumn).isEqualTo(QueryBuilder.bindMarker());
        SimpleStatement statement = CassandraUtils.applyConsistencyLevel(deleteIf.build(), this.writeConsistencyLevel);
        LOGGER.debug("Generated Delete If Id {}", (Object)statement);
        this.deleteIfIdStatement = this.getSession().prepare(statement);
    }

    @Override
    public void confirm(CamelContext camelContext, String exchangeId) {
        String keyColumn = this.getKeyColumn();
        LOGGER.debug("Selecting Ids");
        List<Row> rows = this.selectKeyIds();
        for (Row row : rows) {
            if (!row.getString(this.exchangeIdColumn).equals(exchangeId)) continue;
            String key = row.getString(keyColumn);
            Object[] cqlParams = CassandraUtils.append(this.getPKValues(key), exchangeId);
            LOGGER.debug("Deleting If Id {}", cqlParams);
            this.getSession().execute(this.deleteIfIdStatement.bind(cqlParams));
        }
    }

    private void initDeleteStatement() {
        Delete delete = CassandraUtils.generateDelete(this.table, this.pkColumns, false);
        SimpleStatement statement = CassandraUtils.applyConsistencyLevel(delete.build(), this.writeConsistencyLevel);
        LOGGER.debug("Generated Delete {}", (Object)statement);
        this.deleteStatement = this.getSession().prepare(statement);
    }

    @Override
    public void remove(CamelContext camelContext, String key, Exchange exchange) {
        Object[] idValues = this.getPKValues(key);
        LOGGER.debug("Deleting key {}", (Object)idValues);
        this.getSession().execute(this.deleteStatement.bind(idValues));
    }

    private void initSelectKeyIdStatement() {
        Select select = CassandraUtils.generateSelect(this.table, new String[]{this.getKeyColumn(), this.exchangeIdColumn}, this.pkColumns, this.pkColumns.length - 1);
        SimpleStatement statement = CassandraUtils.applyConsistencyLevel(select.build(), this.readConsistencyLevel);
        LOGGER.debug("Generated Select keys {}", (Object)statement);
        this.selectKeyIdStatement = this.getSession().prepare(statement);
    }

    protected List<Row> selectKeyIds() {
        LOGGER.debug("Selecting keys {}", this.getPrefixPKValues());
        return this.getSession().execute(this.selectKeyIdStatement.bind(this.getPrefixPKValues())).all();
    }

    @Override
    public Set<String> getKeys() {
        List<Row> rows = this.selectKeyIds();
        HashSet<String> keys = new HashSet<String>(rows.size());
        String keyColumnName = this.getKeyColumn();
        for (Row row : rows) {
            keys.add(row.getString(keyColumnName));
        }
        return keys;
    }

    @Override
    public Set<String> scan(CamelContext camelContext) {
        List<Row> rows = this.selectKeyIds();
        HashSet<String> exchangeIds = new HashSet<String>(rows.size());
        for (Row row : rows) {
            exchangeIds.add(row.getString(this.exchangeIdColumn));
        }
        return exchangeIds;
    }

    @Override
    public Exchange recover(CamelContext camelContext, String exchangeId) {
        List<Row> rows = this.selectKeyIds();
        String keyColumnName = this.getKeyColumn();
        String lKey = null;
        for (Row row : rows) {
            String lExchangeId = row.getString(this.exchangeIdColumn);
            if (!lExchangeId.equals(exchangeId)) continue;
            lKey = row.getString(keyColumnName);
            break;
        }
        return lKey == null ? null : this.get(camelContext, lKey);
    }

    public CqlSession getSession() {
        return this.sessionHolder.getSession();
    }

    public void setSession(CqlSession session) {
        this.sessionHolder = new CassandraSessionHolder(session);
    }

    public String getTable() {
        return this.table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public Object[] getPrefixPKValues() {
        return this.prefixPKValues;
    }

    public void setPrefixPKValues(Object ... prefixPKValues) {
        this.prefixPKValues = prefixPKValues;
    }

    public String[] getPKColumns() {
        return this.pkColumns;
    }

    public void setPKColumns(String ... pkColumns) {
        this.pkColumns = pkColumns;
    }

    public String getExchangeIdColumn() {
        return this.exchangeIdColumn;
    }

    public void setExchangeIdColumn(String exchangeIdColumn) {
        this.exchangeIdColumn = exchangeIdColumn;
    }

    public ConsistencyLevel getWriteConsistencyLevel() {
        return this.writeConsistencyLevel;
    }

    public void setWriteConsistencyLevel(ConsistencyLevel writeConsistencyLevel) {
        this.writeConsistencyLevel = writeConsistencyLevel;
    }

    public ConsistencyLevel getReadConsistencyLevel() {
        return this.readConsistencyLevel;
    }

    public void setReadConsistencyLevel(ConsistencyLevel readConsistencyLevel) {
        this.readConsistencyLevel = readConsistencyLevel;
    }

    public String getExchangeColumn() {
        return this.exchangeColumn;
    }

    public void setExchangeColumn(String exchangeColumnName) {
        this.exchangeColumn = exchangeColumnName;
    }

    public Integer getTtl() {
        return this.ttl;
    }

    public void setTtl(Integer ttl) {
        this.ttl = ttl;
    }

    @Override
    public long getRecoveryInterval() {
        return this.recoveryInterval;
    }

    @Override
    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
        this.recoveryInterval = timeUnit.toMillis(interval);
    }

    @Override
    public void setRecoveryInterval(long recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
    }

    @Override
    public boolean isUseRecovery() {
        return this.useRecovery;
    }

    @Override
    public void setUseRecovery(boolean useRecovery) {
        this.useRecovery = useRecovery;
    }

    @Override
    public String getDeadLetterUri() {
        return this.deadLetterUri;
    }

    @Override
    public void setDeadLetterUri(String deadLetterUri) {
        this.deadLetterUri = deadLetterUri;
    }

    @Override
    public int getMaximumRedeliveries() {
        return this.maximumRedeliveries;
    }

    @Override
    public void setMaximumRedeliveries(int maximumRedeliveries) {
        this.maximumRedeliveries = maximumRedeliveries;
    }

    public boolean isAllowSerializedHeaders() {
        return this.allowSerializedHeaders;
    }

    public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
        this.allowSerializedHeaders = allowSerializedHeaders;
    }

    public String getDeserializationFilter() {
        return this.deserializationFilter;
    }

    public void setDeserializationFilter(String deserializationFilter) {
        this.deserializationFilter = deserializationFilter;
    }
}

