package org.apache.omid.tso;

import io.netty.channel.Channel;
import java.io.IOException;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.tso.PersistenceProcessorImpl;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/omid/tso/TestPersistenceProcessorHandler.class */
public class TestPersistenceProcessorHandler {
    private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessorHandler.class);
    private static final int BATCH_ID = 0;
    private static final int BATCH_SIZE = 6;
    private static final long BATCH_SEQUENCE = 0;
    private static final long FIRST_ST = 0;
    private static final long FIRST_CT = 1;
    private static final long SECOND_ST = 2;
    private static final long SECOND_CT = 3;
    private static final long THIRD_ST = 4;
    private static final long THIRD_CT = 5;
    private static final long FOURTH_ST = 6;
    private static final long FOURTH_CT = 7;
    private static final long FIFTH_ST = 8;
    private static final long FIFTH_CT = 9;
    private static final long SIXTH_ST = 10;

    @Mock
    private CommitTable.Writer mockWriter;

    @Mock
    private CommitTable.Client mockClient;

    @Mock
    private LeaseManager leaseManager;

    @Mock
    private ReplyProcessor replyProcessor;

    @Mock
    private RetryProcessor retryProcessor;

    @Mock
    private Panicker panicker;
    private CommitTable commitTable;
    private MetricsRegistry metrics;
    private PersistenceProcessorHandler persistenceHandler;

    @BeforeMethod(alwaysRun = true, timeOut = 30000)
    public void initMocksAndComponents() throws Exception {
        MockitoAnnotations.initMocks(this);
        this.metrics = new NullMetricsProvider();
        this.commitTable = new CommitTable() { // from class: org.apache.omid.tso.TestPersistenceProcessorHandler.1
            public CommitTable.Writer getWriter() {
                return TestPersistenceProcessorHandler.this.mockWriter;
            }

            public CommitTable.Client getClient() {
                return TestPersistenceProcessorHandler.this.mockClient;
            }
        };
        ((LeaseManager) Mockito.doReturn(true).when(this.leaseManager)).stillInLeasePeriod();
        this.persistenceHandler = (PersistenceProcessorHandler) Mockito.spy(new PersistenceProcessorHandler(this.metrics, "localhost:1234", this.leaseManager, this.commitTable, this.replyProcessor, this.retryProcessor, this.panicker));
    }

    @AfterMethod
    void afterMethod() {
        Mockito.reset(new CommitTable.Writer[]{this.mockWriter});
    }

    @Test(timeOut = 1000)
    public void testPersistentProcessorHandlerIdsAreCreatedConsecutive() throws Exception {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setNumConcurrentCTWriters(32);
        PersistenceProcessorHandler[] persistenceProcessorHandlerArr = new PersistenceProcessorHandler[tSOServerConfig.getNumConcurrentCTWriters()];
        for (int i = BATCH_ID; i < tSOServerConfig.getNumConcurrentCTWriters(); i++) {
            persistenceProcessorHandlerArr[i] = new PersistenceProcessorHandler(this.metrics, "localhost:1234", (LeaseManagement) Mockito.mock(LeaseManager.class), this.commitTable, (ReplyProcessor) Mockito.mock(ReplyProcessor.class), this.retryProcessor, this.panicker);
        }
        for (int i2 = BATCH_ID; i2 < tSOServerConfig.getNumConcurrentCTWriters(); i2++) {
            if (i2 + 1 < tSOServerConfig.getNumConcurrentCTWriters()) {
                Assert.assertEquals(persistenceProcessorHandlerArr[i2].getId(), String.valueOf(Integer.valueOf(persistenceProcessorHandlerArr[i2 + 1].getId()).intValue() - 1));
            } else {
                Assert.assertEquals(persistenceProcessorHandlerArr[i2].getId(), String.valueOf(PersistenceProcessorHandler.consecutiveSequenceCreator.get() - 1));
            }
        }
    }

    @Test(timeOut = 10000)
    public void testProcessingOfEmptyBatchPersistEvent() throws Exception {
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        this.persistenceHandler.onEvent(persistBatchEvent);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(BATCH_ID));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).filterAndDissambiguateClientRetries((Batch) Matchers.eq(batch));
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.never())).disambiguateRetryRequestHeuristically(Matchers.anyLong(), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.times(1))).manageResponsesBatch(Matchers.eq(0L), (Batch) Matchers.eq(batch));
        Assert.assertTrue(batch.isEmpty());
    }

    @Test(timeOut = 10000)
    public void testProcessingOfBatchPersistEventWithASingleTimestampEvent() throws Exception {
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addTimestamp(0L, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        this.persistenceHandler.onEvent(persistBatchEvent);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(BATCH_ID));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).filterAndDissambiguateClientRetries((Batch) Matchers.eq(batch));
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.never())).disambiguateRetryRequestHeuristically(Matchers.anyLong(), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.times(1))).manageResponsesBatch(Matchers.eq(0L), (Batch) Matchers.eq(batch));
        Assert.assertEquals(batch.getNumEvents(), 1);
        Assert.assertEquals(batch.get(BATCH_ID).getStartTimestamp(), 0L);
    }

    @Test(timeOut = 10000)
    public void testProcessingOfBatchPersistEventWithASingleCommitEvent() throws Exception {
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addCommit(0L, FIRST_CT, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class), Optional.absent());
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        this.persistenceHandler.onEvent(persistBatchEvent);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(1));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).filterAndDissambiguateClientRetries(batch);
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.never())).disambiguateRetryRequestHeuristically(Matchers.anyLong(), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.times(1))).manageResponsesBatch(Matchers.eq(0L), (Batch) Matchers.eq(batch));
        Assert.assertEquals(batch.getNumEvents(), 1);
        Assert.assertEquals(batch.get(BATCH_ID).getStartTimestamp(), 0L);
        Assert.assertEquals(batch.get(BATCH_ID).getCommitTimestamp(), FIRST_CT);
    }

    @Test(timeOut = 10000)
    public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() throws Exception {
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addAbort(0L, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        this.persistenceHandler.onEvent(persistBatchEvent);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(BATCH_ID));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).filterAndDissambiguateClientRetries(batch);
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.never())).disambiguateRetryRequestHeuristically(Matchers.anyLong(), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.times(1))).manageResponsesBatch(Matchers.eq(0L), (Batch) Matchers.eq(batch));
        Assert.assertEquals(batch.getNumEvents(), 1);
        Assert.assertEquals(batch.get(BATCH_ID).getStartTimestamp(), 0L);
    }

    @Test(timeOut = 10000)
    public void testProcessingOfBatchPersistEventWithASingleCommitRetryEvent() throws Exception {
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addCommitRetry(0L, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        this.persistenceHandler.onEvent(persistBatchEvent);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(BATCH_ID));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).filterAndDissambiguateClientRetries(batch);
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.times(1))).disambiguateRetryRequestHeuristically(Matchers.eq(0L), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.times(1))).manageResponsesBatch(Matchers.eq(0L), (Batch) Matchers.eq(batch));
        Assert.assertEquals(batch.getNumEvents(), BATCH_ID);
    }

    @Test(timeOut = 10000)
    public void testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry() throws Exception {
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addCommit(0L, FIRST_CT, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class), Optional.absent());
        batch.addCommitRetry(SECOND_ST, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        Assert.assertEquals(batch.getNumEvents(), 2);
        this.persistenceHandler.onEvent(persistBatchEvent);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(1));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).filterAndDissambiguateClientRetries((Batch) Matchers.eq(batch));
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.times(1))).disambiguateRetryRequestHeuristically(Matchers.eq(SECOND_ST), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.times(1))).manageResponsesBatch(Matchers.eq(0L), (Batch) Matchers.eq(batch));
        Assert.assertEquals(batch.getNumEvents(), 1);
        Assert.assertEquals(batch.get(BATCH_ID).getStartTimestamp(), 0L);
        Assert.assertEquals(batch.get(BATCH_ID).getCommitTimestamp(), FIRST_CT);
    }

    @Test(timeOut = 10000)
    public void testProcessingOfBatchPersistEventWith2EventsCommitRetryAndCommit() throws Exception {
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addCommitRetry(0L, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        batch.addCommit(SECOND_ST, SECOND_CT, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class), Optional.absent());
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        Assert.assertEquals(batch.getNumEvents(), 2);
        this.persistenceHandler.onEvent(persistBatchEvent);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(1));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).filterAndDissambiguateClientRetries((Batch) Matchers.eq(batch));
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.times(1))).disambiguateRetryRequestHeuristically(Matchers.eq(0L), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.times(1))).manageResponsesBatch(Matchers.eq(0L), (Batch) Matchers.eq(batch));
        Assert.assertEquals(batch.getNumEvents(), 1);
        Assert.assertEquals(batch.get(BATCH_ID).getStartTimestamp(), SECOND_ST);
        Assert.assertEquals(batch.get(BATCH_ID).getCommitTimestamp(), SECOND_CT);
    }

    @Test(timeOut = 10000)
    public void testProcessingOfBatchPersistEventWith2CommitRetryEvents() throws Exception {
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addCommitRetry(0L, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        batch.addCommitRetry(SECOND_ST, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        Assert.assertEquals(batch.getNumEvents(), 2);
        this.persistenceHandler.onEvent(persistBatchEvent);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(BATCH_ID));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).filterAndDissambiguateClientRetries((Batch) Matchers.eq(batch));
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.times(1))).disambiguateRetryRequestHeuristically(Matchers.eq(0L), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.times(1))).disambiguateRetryRequestHeuristically(Matchers.eq(SECOND_ST), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.times(1))).manageResponsesBatch(Matchers.eq(0L), (Batch) Matchers.eq(batch));
        Assert.assertEquals(batch.getNumEvents(), BATCH_ID);
    }

    @Test(timeOut = 10000)
    public void testProcessingOfBatchPersistEventWith2AbortEvents() throws Exception {
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addAbort(0L, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        batch.addAbort(SECOND_ST, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        Assert.assertEquals(batch.getNumEvents(), 2);
        this.persistenceHandler.onEvent(persistBatchEvent);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(BATCH_ID));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).filterAndDissambiguateClientRetries((Batch) Matchers.eq(batch));
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.never())).disambiguateRetryRequestHeuristically(Matchers.anyLong(), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.times(1))).manageResponsesBatch(Matchers.eq(0L), (Batch) Matchers.eq(batch));
        Assert.assertEquals(batch.getNumEvents(), 2);
        Assert.assertEquals(batch.get(BATCH_ID).getStartTimestamp(), 0L);
        Assert.assertEquals(batch.get(1).getStartTimestamp(), SECOND_ST);
    }

    @Test(timeOut = 10000)
    public void testProcessingOfBatchPersistEventWithMultipleRetryAndNonRetryEvents() throws Exception {
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addTimestamp(0L, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        batch.addCommitRetry(SECOND_ST, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        batch.addCommit(THIRD_ST, THIRD_CT, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class), Optional.absent());
        batch.addAbort(FOURTH_ST, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        batch.addCommit(FIFTH_ST, FIFTH_CT, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class), Optional.absent());
        batch.addCommitRetry(SIXTH_ST, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class));
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        Assert.assertEquals(batch.getNumEvents(), BATCH_SIZE);
        this.persistenceHandler.onEvent(persistBatchEvent);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(2);
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).filterAndDissambiguateClientRetries((Batch) Matchers.eq(batch));
        ((RetryProcessor) Mockito.verify(this.retryProcessor, Mockito.times(1))).disambiguateRetryRequestHeuristically(Matchers.eq(SECOND_ST), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.times(1))).manageResponsesBatch(Matchers.eq(0L), (Batch) Matchers.eq(batch));
        Assert.assertEquals(batch.getNumEvents(), 4);
        Assert.assertEquals(batch.get(BATCH_ID).getStartTimestamp(), 0L);
        Assert.assertEquals(batch.get(1).getStartTimestamp(), FIFTH_ST);
        Assert.assertEquals(batch.get(1).getCommitTimestamp(), FIFTH_CT);
        Assert.assertEquals(batch.get(2).getStartTimestamp(), THIRD_ST);
        Assert.assertEquals(batch.get(2).getCommitTimestamp(), THIRD_CT);
        Assert.assertEquals(batch.get(3).getStartTimestamp(), FOURTH_ST);
    }

    @Test(timeOut = 10000)
    public void testPanicPersistingEvents() throws Exception {
        Panicker panicker = (Panicker) Mockito.spy(new RuntimeExceptionPanicker());
        this.persistenceHandler = (PersistenceProcessorHandler) Mockito.spy(new PersistenceProcessorHandler(this.metrics, "localhost:1234", this.leaseManager, this.commitTable, this.replyProcessor, this.retryProcessor, panicker));
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addCommit(0L, FIRST_CT, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class), Optional.absent());
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        ((CommitTable.Writer) Mockito.doThrow(IOException.class).when(this.mockWriter)).flush();
        try {
            this.persistenceHandler.onEvent(persistBatchEvent);
            Assert.fail();
        } catch (RuntimeException e) {
        }
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(1);
        ((Panicker) Mockito.verify(panicker, Mockito.times(1))).panic((String) Matchers.eq("Error persisting commit batch"), (Throwable) Matchers.any(IOException.class));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.never())).filterAndDissambiguateClientRetries((Batch) Matchers.any(Batch.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.never())).manageResponsesBatch(Matchers.anyLong(), (Batch) Matchers.any(Batch.class));
    }

    @Test(timeOut = 10000)
    public void testPanicBecauseMasterLosesMastership() throws Exception {
        ((LeaseManager) Mockito.doReturn(false).when(this.leaseManager)).stillInLeasePeriod();
        Panicker panicker = (Panicker) Mockito.spy(new RuntimeExceptionPanicker());
        this.persistenceHandler = (PersistenceProcessorHandler) Mockito.spy(new PersistenceProcessorHandler(this.metrics, "localhost:1234", this.leaseManager, this.commitTable, this.replyProcessor, this.retryProcessor, panicker));
        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
        batch.addCommit(0L, FIRST_CT, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class), Optional.absent());
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent, 0L, batch);
        try {
            this.persistenceHandler.onEvent(persistBatchEvent);
            Assert.fail();
        } catch (RuntimeException e) {
        }
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(1));
        ((CommitTable.Writer) Mockito.verify(this.mockWriter, Mockito.never())).flush();
        ((Panicker) Mockito.verify(panicker, Mockito.times(1))).panic((String) Matchers.eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), (Throwable) Matchers.any(IOException.class));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.never())).filterAndDissambiguateClientRetries((Batch) Matchers.any(Batch.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.never())).manageResponsesBatch(Matchers.anyLong(), (Batch) Matchers.any(Batch.class));
        ((LeaseManager) Mockito.doReturn(true).doReturn(false).when(this.leaseManager)).stillInLeasePeriod();
        Panicker panicker2 = (Panicker) Mockito.spy(new RuntimeExceptionPanicker());
        this.persistenceHandler = (PersistenceProcessorHandler) Mockito.spy(new PersistenceProcessorHandler(this.metrics, "localhost:1234", this.leaseManager, this.commitTable, this.replyProcessor, this.retryProcessor, panicker2));
        Batch batch2 = new Batch(BATCH_ID, BATCH_SIZE);
        batch2.addCommit(0L, FIRST_CT, (Channel) null, (MonitoringContext) Mockito.mock(MonitoringContextImpl.class), Optional.absent());
        PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent2 = new PersistenceProcessorImpl.PersistBatchEvent();
        PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch(persistBatchEvent2, 0L, batch2);
        try {
            this.persistenceHandler.onEvent(persistBatchEvent2);
            Assert.fail();
        } catch (RuntimeException e2) {
        }
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.times(1))).flush(Matchers.eq(1));
        ((CommitTable.Writer) Mockito.verify(this.mockWriter, Mockito.times(1))).flush();
        ((Panicker) Mockito.verify(panicker2, Mockito.times(1))).panic((String) Matchers.eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), (Throwable) Matchers.any(IOException.class));
        ((PersistenceProcessorHandler) Mockito.verify(this.persistenceHandler, Mockito.never())).filterAndDissambiguateClientRetries((Batch) Matchers.any(Batch.class));
        ((ReplyProcessor) Mockito.verify(this.replyProcessor, Mockito.never())).manageResponsesBatch(Matchers.anyLong(), (Batch) Matchers.any(Batch.class));
    }
}
