package org.apache.hive.streaming;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hive/streaming/TransactionBatch.class */
public class TransactionBatch extends AbstractStreamingTransaction {
    private static final Logger LOG;
    private static final int DEFAULT_HEARTBEAT_INTERVAL = 60000;
    private String username;
    private HiveStreamingConnection conn;
    private ScheduledExecutorService scheduledExecutorService;
    private String partNameForLock;
    private final AtomicLong minTxnId;
    private final long maxTxnId;
    private String agentInfo;
    private int numTxns;
    private HiveStreamingConnection.TxnState[] txnStatus;
    private long lastTxnUsed;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected Set<String> createdPartitions = null;
    private LockRequest lockRequest = null;
    private final ReentrantLock transactionLock = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hive/streaming/TransactionBatch$HeartbeatRunnable.class */
    public static class HeartbeatRunnable implements Runnable {
        private final HiveStreamingConnection conn;
        private final AtomicLong minTxnId;
        private final long maxTxnId;
        private final ReentrantLock transactionLock;
        private final AtomicBoolean isTxnClosed;

        HeartbeatRunnable(HiveStreamingConnection hiveStreamingConnection, AtomicLong atomicLong, long j, ReentrantLock reentrantLock, AtomicBoolean atomicBoolean) {
            this.conn = hiveStreamingConnection;
            this.minTxnId = atomicLong;
            this.maxTxnId = j;
            this.transactionLock = reentrantLock;
            this.isTxnClosed = atomicBoolean;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.transactionLock.lock();
            try {
                if (this.minTxnId.get() > 0) {
                    HeartbeatTxnRangeResponse heartbeatTxnRange = this.conn.getHeatbeatMSC().heartbeatTxnRange(this.minTxnId.get(), this.maxTxnId);
                    if (heartbeatTxnRange.getAborted().isEmpty() && heartbeatTxnRange.getNosuch().isEmpty()) {
                        TransactionBatch.LOG.info("Heartbeat sent for range: [{}-{}]", Long.valueOf(this.minTxnId.get()), Long.valueOf(this.maxTxnId));
                    } else {
                        TransactionBatch.LOG.error("Heartbeat failure: {}", heartbeatTxnRange.toString());
                        this.isTxnClosed.set(true);
                    }
                }
            } catch (TException e) {
                TransactionBatch.LOG.warn("Failure to heartbeat for transaction range: [" + this.minTxnId.get() + "-" + this.maxTxnId + "]", e);
            } finally {
                this.transactionLock.unlock();
            }
        }
    }

    public TransactionBatch(HiveStreamingConnection hiveStreamingConnection) throws StreamingException {
        this.partNameForLock = null;
        try {
            try {
                if (hiveStreamingConnection.isPartitionedTable() && !hiveStreamingConnection.isDynamicPartitioning()) {
                    this.partNameForLock = Warehouse.makePartName(hiveStreamingConnection.getTable().getPartitionKeys(), hiveStreamingConnection.getStaticPartitionValues());
                }
                this.conn = hiveStreamingConnection;
                this.username = hiveStreamingConnection.getUsername();
                this.recordWriter = hiveStreamingConnection.getRecordWriter();
                this.agentInfo = hiveStreamingConnection.getAgentInfo();
                this.numTxns = hiveStreamingConnection.getTransactionBatchSize();
                setupHeartBeatThread();
                List<Long> openTxnImpl = openTxnImpl(this.username, this.numTxns);
                this.txnToWriteIds = allocateWriteIdsImpl(openTxnImpl);
                if (!$assertionsDisabled && this.txnToWriteIds.size() != this.numTxns) {
                    throw new AssertionError();
                }
                this.txnStatus = new HiveStreamingConnection.TxnState[this.numTxns];
                for (int i = 0; i < this.txnStatus.length; i++) {
                    if (!$assertionsDisabled && this.txnToWriteIds.get(i).getTxnId() != openTxnImpl.get(i).longValue()) {
                        throw new AssertionError();
                    }
                    this.txnStatus[i] = HiveStreamingConnection.TxnState.OPEN;
                }
                this.state = HiveStreamingConnection.TxnState.INACTIVE;
                this.recordWriter.init(hiveStreamingConnection, this.txnToWriteIds.get(0).getWriteId(), this.txnToWriteIds.get(this.numTxns - 1).getWriteId(), hiveStreamingConnection.getStatementId().intValue());
                this.minTxnId = new AtomicLong(openTxnImpl.get(0).longValue());
                this.maxTxnId = openTxnImpl.get(openTxnImpl.size() - 1).longValue();
                markDead(true);
            } catch (TException e) {
                throw new StreamingException(hiveStreamingConnection.toString(), e);
            }
        } catch (Throwable th) {
            markDead(false);
            throw th;
        }
    }

    private void setupHeartBeatThread() {
        long j;
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HiveStreamingConnection-Heartbeat-Thread").build());
        try {
            j = DbTxnManager.getHeartbeatInterval(this.conn.getConf());
        } catch (LockException e) {
            j = 60000;
        }
        long random = (long) (j * 0.75d * Math.random());
        LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}", new Object[]{Long.valueOf(j), Long.valueOf(random), this.conn.getAgentInfo()});
        this.scheduledExecutorService.scheduleWithFixedDelay(new HeartbeatRunnable(this.conn, this.minTxnId, this.maxTxnId, this.transactionLock, this.isTxnClosed), random, j, TimeUnit.MILLISECONDS);
    }

    private List<Long> openTxnImpl(String str, int i) throws TException {
        return this.conn.getMSC().openTxns(str, i).getTxn_ids();
    }

    private List<TxnToWriteId> allocateWriteIdsImpl(List<Long> list) throws TException {
        return this.conn.getMSC().allocateTableWriteIdsBatch(list, this.conn.getDatabase(), this.conn.getTable().getTableName());
    }

    public String toString() {
        if (this.txnToWriteIds == null || this.txnToWriteIds.isEmpty()) {
            return "{}";
        }
        StringBuilder sb = new StringBuilder(" TxnStatus[");
        HiveStreamingConnection.TxnState[] txnStateArr = this.txnStatus;
        int length = txnStateArr.length;
        for (int i = 0; i < length; i++) {
            HiveStreamingConnection.TxnState txnState = txnStateArr[i];
            sb.append(txnState == null ? "N" : txnState);
        }
        sb.append("] LastUsed ").append(JavaUtils.txnIdToString(this.lastTxnUsed));
        return "TxnId/WriteIds=[" + this.txnToWriteIds.get(0).getTxnId() + "/" + this.txnToWriteIds.get(0).getWriteId() + "..." + this.txnToWriteIds.get(this.txnToWriteIds.size() - 1).getTxnId() + "/" + this.txnToWriteIds.get(this.txnToWriteIds.size() - 1).getWriteId() + "] on connection = " + this.conn + "; " + ((Object) sb);
    }

    @Override // org.apache.hive.streaming.StreamingTransaction
    public void beginNextTransaction() throws StreamingException {
        checkIsClosed();
        beginNextTransactionImpl();
    }

    private void beginNextTransactionImpl() throws StreamingException {
        beginNextTransactionImpl("No more transactions available in next batch for connection: " + this.conn + " user: " + this.username);
        this.lastTxnUsed = getCurrentTxnId();
        this.lockRequest = createLockRequest(this.conn, this.partNameForLock, this.username, getCurrentTxnId(), this.agentInfo);
        this.createdPartitions = Sets.newHashSet();
        try {
            if (this.conn.getMSC().lock(this.lockRequest).getState() != LockState.ACQUIRED) {
                throw new TransactionError("Unable to acquire lock on " + this.conn);
            }
        } catch (TException e) {
            throw new TransactionError("Unable to acquire lock on " + this.conn, e);
        }
    }

    @Override // org.apache.hive.streaming.StreamingTransaction
    public void commitWithPartitions(Set<String> set) throws StreamingException {
        checkIsClosed();
        boolean z = false;
        try {
            commitImpl(set);
            z = true;
            markDead(true);
        } catch (Throwable th) {
            markDead(z);
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    private void commitImpl(Set<String> set) throws StreamingException {
        try {
            this.recordWriter.flush();
            TxnToWriteId txnToWriteId = this.txnToWriteIds.get(this.currentTxnIndex);
            if (this.conn.isDynamicPartitioning()) {
                ArrayList arrayList = new ArrayList(this.recordWriter.getPartitions());
                this.createdPartitions.addAll(arrayList);
                if (set != null) {
                    arrayList.addAll(set);
                }
                if (!arrayList.isEmpty()) {
                    this.conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), this.conn.getDatabase(), this.conn.getTable().getTableName(), arrayList, DataOperationType.INSERT);
                }
            }
            if (this.currentTxnIndex + 1 >= this.txnToWriteIds.size()) {
                this.recordWriter.close();
                this.conn.addWriteNotificationEvents();
            }
            this.transactionLock.lock();
            try {
                this.conn.getMSC().commitTxn(txnToWriteId.getTxnId());
                if (this.currentTxnIndex + 1 < this.txnToWriteIds.size()) {
                    this.minTxnId.set(this.txnToWriteIds.get(this.currentTxnIndex + 1).getTxnId());
                } else {
                    this.minTxnId.set(-1L);
                }
                this.transactionLock.unlock();
                this.state = HiveStreamingConnection.TxnState.COMMITTED;
                this.txnStatus[this.currentTxnIndex] = HiveStreamingConnection.TxnState.COMMITTED;
            } catch (Throwable th) {
                this.transactionLock.unlock();
                throw th;
            }
        } catch (NoSuchTxnException e) {
            throw new TransactionError("Invalid transaction id : " + getCurrentTxnId(), e);
        } catch (TxnAbortedException e2) {
            throw new TransactionError("Aborted transaction cannot be committed", e2);
        } catch (TException e3) {
            throw new TransactionError("Unable to commitTransaction transaction" + getCurrentTxnId(), e3);
        }
    }

    @Override // org.apache.hive.streaming.StreamingTransaction
    public void abort() throws StreamingException {
        if (this.isTxnClosed.get()) {
            return;
        }
        abort(false);
    }

    private void abort(boolean z) throws StreamingException {
        abortImpl(z);
    }

    private void abortImpl(boolean z) throws StreamingException {
        if (this.minTxnId == null) {
            return;
        }
        this.transactionLock.lock();
        try {
            try {
                if (z) {
                    this.minTxnId.set(-1L);
                    this.currentTxnIndex = Math.max(this.currentTxnIndex + ((this.state == HiveStreamingConnection.TxnState.ABORTED || this.state == HiveStreamingConnection.TxnState.COMMITTED) ? 1 : 0), 0);
                    while (this.currentTxnIndex < this.txnToWriteIds.size()) {
                        this.conn.getMSC().rollbackTxn(this.txnToWriteIds.get(this.currentTxnIndex).getTxnId());
                        this.txnStatus[this.currentTxnIndex] = HiveStreamingConnection.TxnState.ABORTED;
                        this.currentTxnIndex++;
                    }
                    this.currentTxnIndex--;
                } else {
                    if (this.currentTxnIndex + 1 < this.txnToWriteIds.size()) {
                        this.minTxnId.set(this.txnToWriteIds.get(this.currentTxnIndex + 1).getTxnId());
                    } else {
                        this.minTxnId.set(-1L);
                    }
                    long currentTxnId = getCurrentTxnId();
                    if (currentTxnId > 0) {
                        this.conn.getMSC().rollbackTxn(currentTxnId);
                        this.txnStatus[this.currentTxnIndex] = HiveStreamingConnection.TxnState.ABORTED;
                    }
                }
                this.state = HiveStreamingConnection.TxnState.ABORTED;
                this.transactionLock.unlock();
            } catch (NoSuchTxnException e) {
                throw new TransactionError("Unable to abort invalid transaction id : " + getCurrentTxnId(), e);
            } catch (TException e2) {
                throw new TransactionError("Unable to abort transaction id : " + getCurrentTxnId(), e2);
            }
        } catch (Throwable th) {
            this.transactionLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.hive.streaming.StreamingTransaction
    public void close() throws StreamingException {
        if (isClosed()) {
            return;
        }
        this.isTxnClosed.set(true);
        try {
            abort(true);
            try {
                closeImpl();
            } catch (Exception e) {
                LOG.error("Fatal error on " + toString() + "; cause " + e.getMessage(), e);
                throw new StreamingException("Unable to close", e);
            }
        } catch (Exception e2) {
            LOG.error("Fatal error on " + toString() + "; cause " + e2.getMessage(), e2);
            throw new StreamingException("Unable to abort", e2);
        }
    }

    private void closeImpl() throws StreamingException {
        this.state = HiveStreamingConnection.TxnState.INACTIVE;
        this.recordWriter.close();
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    private static LockRequest createLockRequest(HiveStreamingConnection hiveStreamingConnection, String str, String str2, long j, String str3) {
        LockRequestBuilder lockRequestBuilder = new LockRequestBuilder(str3);
        lockRequestBuilder.setUser(str2);
        lockRequestBuilder.setTransactionId(j);
        LockComponentBuilder operationType = new LockComponentBuilder().setDbName(hiveStreamingConnection.getDatabase()).setTableName(hiveStreamingConnection.getTable().getTableName()).setShared().setOperationType(DataOperationType.INSERT);
        if (hiveStreamingConnection.isDynamicPartitioning()) {
            operationType.setIsDynamicPartitionWrite(true);
        }
        if (str != null && !str.isEmpty()) {
            operationType.setPartitionName(str);
        }
        lockRequestBuilder.addLockComponent(operationType.build());
        return lockRequestBuilder.build();
    }

    @Override // org.apache.hive.streaming.StreamingTransaction
    public Set<String> getPartitions() {
        return this.createdPartitions;
    }

    @Override // org.apache.hive.streaming.AbstractStreamingTransaction, org.apache.hive.streaming.StreamingTransaction
    public /* bridge */ /* synthetic */ void commit() throws StreamingException {
        super.commit();
    }

    @Override // org.apache.hive.streaming.AbstractStreamingTransaction, org.apache.hive.streaming.StreamingTransaction
    public /* bridge */ /* synthetic */ List getTxnToWriteIds() {
        return super.getTxnToWriteIds();
    }

    @Override // org.apache.hive.streaming.AbstractStreamingTransaction, org.apache.hive.streaming.StreamingTransaction
    public /* bridge */ /* synthetic */ long getCurrentTxnId() {
        return super.getCurrentTxnId();
    }

    @Override // org.apache.hive.streaming.AbstractStreamingTransaction, org.apache.hive.streaming.StreamingTransaction
    public /* bridge */ /* synthetic */ HiveStreamingConnection.TxnState getCurrentTransactionState() {
        return super.getCurrentTransactionState();
    }

    @Override // org.apache.hive.streaming.AbstractStreamingTransaction, org.apache.hive.streaming.StreamingTransaction
    public /* bridge */ /* synthetic */ boolean isClosed() {
        return super.isClosed();
    }

    @Override // org.apache.hive.streaming.AbstractStreamingTransaction, org.apache.hive.streaming.StreamingTransaction
    public /* bridge */ /* synthetic */ int remainingTransactions() {
        return super.remainingTransactions();
    }

    @Override // org.apache.hive.streaming.AbstractStreamingTransaction, org.apache.hive.streaming.StreamingTransaction
    public /* bridge */ /* synthetic */ long getCurrentWriteId() {
        return super.getCurrentWriteId();
    }

    @Override // org.apache.hive.streaming.AbstractStreamingTransaction, org.apache.hive.streaming.StreamingTransaction
    public /* bridge */ /* synthetic */ void write(InputStream inputStream) throws StreamingException {
        super.write(inputStream);
    }

    @Override // org.apache.hive.streaming.AbstractStreamingTransaction, org.apache.hive.streaming.StreamingTransaction
    public /* bridge */ /* synthetic */ void write(byte[] bArr) throws StreamingException {
        super.write(bArr);
    }

    static {
        $assertionsDisabled = !TransactionBatch.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(TransactionBatch.class.getName());
    }
}
