/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.qjournal.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.TextFormat;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.Range;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.server.JNStorage;
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
import org.apache.hadoop.hdfs.qjournal.server.JournalMetrics;
import org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
import org.apache.hadoop.hdfs.util.BestEffortLongFile;
import org.apache.hadoop.hdfs.util.PersistentLongFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Journal
implements Closeable {
    static final Logger LOG = LoggerFactory.getLogger(Journal.class);
    private EditLogOutputStream curSegment;
    private long curSegmentTxId = -12345L;
    private int curSegmentLayoutVersion = 0;
    private long nextTxId = -12345L;
    private long highestWrittenTxId = 0L;
    private final String journalId;
    private final JNStorage storage;
    private PersistentLongFile lastPromisedEpoch;
    private long currentEpochIpcSerial = -1L;
    private PersistentLongFile lastWriterEpoch;
    private BestEffortLongFile committedTxnId;
    public static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
    public static final String LAST_WRITER_EPOCH = "last-writer-epoch";
    private static final String COMMITTED_TXID_FILENAME = "committed-txid";
    private final FileJournalManager fjm;
    private JournaledEditsCache cache;
    private final JournalMetrics metrics;
    private long lastJournalTimestamp = 0L;
    private Configuration conf = null;
    private boolean triedJournalSyncerStartedwithnsId = false;
    private static final int WARN_SYNC_MILLIS_THRESHOLD = 1000;

    Journal(Configuration conf, File logDir, String journalId, HdfsServerConstants.StartupOption startOpt, StorageErrorReporter errorReporter) throws IOException {
        this.conf = conf;
        this.storage = new JNStorage(conf, logDir, startOpt, errorReporter);
        this.journalId = journalId;
        this.refreshCachedData();
        this.fjm = this.storage.getJournalManager();
        this.cache = this.createCache();
        this.metrics = JournalMetrics.create(this);
        FileJournalManager.EditLogFile latest = this.scanStorageForLatestEdits();
        if (latest != null) {
            this.updateHighestWrittenTxId(latest.getLastTxId());
        }
    }

    private JournaledEditsCache createCache() {
        if (this.conf.getBoolean("dfs.ha.tail-edits.in-progress", false)) {
            return new JournaledEditsCache(this.conf);
        }
        return null;
    }

    public void setTriedJournalSyncerStartedwithnsId(boolean started) {
        this.triedJournalSyncerStartedwithnsId = started;
    }

    public boolean getTriedJournalSyncerStartedwithnsId() {
        return this.triedJournalSyncerStartedwithnsId;
    }

    private synchronized void refreshCachedData() {
        IOUtils.closeStream((Closeable)this.committedTxnId);
        File currentDir = this.storage.getSingularStorageDir().getCurrentDir();
        this.lastPromisedEpoch = new PersistentLongFile(new File(currentDir, LAST_PROMISED_FILENAME), 0L);
        this.lastWriterEpoch = new PersistentLongFile(new File(currentDir, LAST_WRITER_EPOCH), 0L);
        this.committedTxnId = new BestEffortLongFile(new File(currentDir, COMMITTED_TXID_FILENAME), -12345L);
    }

    private synchronized FileJournalManager.EditLogFile scanStorageForLatestEdits() throws IOException {
        if (!this.fjm.getStorageDirectory().getCurrentDir().exists()) {
            return null;
        }
        LOG.info("Scanning storage " + this.fjm);
        List<FileJournalManager.EditLogFile> files = this.fjm.getLogFiles(0L);
        while (!files.isEmpty()) {
            FileJournalManager.EditLogFile latestLog = files.remove(files.size() - 1);
            latestLog.scanLog(Long.MAX_VALUE, false);
            LOG.info("Latest log is " + latestLog + " ; journal id: " + this.journalId);
            if (latestLog.getLastTxId() == -12345L) {
                LOG.warn("Latest log " + latestLog + " has no transactions. moving it aside and looking for previous log ; journal id: " + this.journalId);
                latestLog.moveAsideEmptyFile();
                continue;
            }
            return latestLog;
        }
        LOG.info("No files in " + this.fjm);
        return null;
    }

    void format(NamespaceInfo nsInfo, boolean force) throws IOException {
        Preconditions.checkState((nsInfo.getNamespaceID() != 0 ? 1 : 0) != 0, (String)"can't format with uninitialized namespace info: %s", (Object)nsInfo);
        LOG.info("Formatting journal id : " + this.journalId + " with namespace info: " + nsInfo + " and force: " + force);
        this.storage.format(nsInfo, force);
        this.cache = this.createCache();
        this.refreshCachedData();
    }

    @Override
    public void close() throws IOException {
        this.storage.close();
        IOUtils.closeStream((Closeable)this.committedTxnId);
        IOUtils.closeStream((Closeable)this.curSegment);
    }

    JNStorage getStorage() {
        return this.storage;
    }

    String getJournalId() {
        return this.journalId;
    }

    synchronized long getLastPromisedEpoch() throws IOException {
        this.checkFormatted();
        return this.lastPromisedEpoch.get();
    }

    public synchronized long getLastWriterEpoch() throws IOException {
        this.checkFormatted();
        return this.lastWriterEpoch.get();
    }

    synchronized long getCommittedTxnId() throws IOException {
        return this.committedTxnId.get();
    }

    synchronized long getLastJournalTimestamp() {
        return this.lastJournalTimestamp;
    }

    synchronized long getCurrentLagTxns() throws IOException {
        long committed = this.committedTxnId.get();
        if (committed == 0L) {
            return 0L;
        }
        return Math.max(committed - this.highestWrittenTxId, 0L);
    }

    synchronized long getHighestWrittenTxId() {
        return this.highestWrittenTxId;
    }

    private void updateHighestWrittenTxId(long val) {
        this.highestWrittenTxId = val;
        this.fjm.setLastReadableTxId(val);
    }

    JournalMetrics getMetrics() {
        return this.metrics;
    }

    synchronized QJournalProtocolProtos.NewEpochResponseProto newEpoch(NamespaceInfo nsInfo, long epoch) throws IOException {
        this.checkFormatted();
        this.storage.checkConsistentNamespace(nsInfo);
        if (epoch <= this.getLastPromisedEpoch()) {
            throw new IOException("Proposed epoch " + epoch + " <= last promise " + this.getLastPromisedEpoch() + " ; journal id: " + this.journalId);
        }
        this.updateLastPromisedEpoch(epoch);
        this.abortCurSegment();
        QJournalProtocolProtos.NewEpochResponseProto.Builder builder = QJournalProtocolProtos.NewEpochResponseProto.newBuilder();
        FileJournalManager.EditLogFile latestFile = this.scanStorageForLatestEdits();
        if (latestFile != null) {
            builder.setLastSegmentTxId(latestFile.getFirstTxId());
        }
        return builder.build();
    }

    private void updateLastPromisedEpoch(long newEpoch) throws IOException {
        LOG.info("Updating lastPromisedEpoch from " + this.lastPromisedEpoch.get() + " to " + newEpoch + " for client " + Server.getRemoteIp() + " ; journal id: " + this.journalId);
        this.lastPromisedEpoch.set(newEpoch);
        this.currentEpochIpcSerial = -1L;
    }

    private void abortCurSegment() throws IOException {
        if (this.curSegment == null) {
            return;
        }
        this.curSegment.abort();
        this.curSegment = null;
        this.curSegmentTxId = -12345L;
        this.curSegmentLayoutVersion = 0;
    }

    synchronized void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException {
        this.checkFormatted();
        this.checkWriteRequest(reqInfo);
        if (numTxns == 0) {
            return;
        }
        this.checkSync(this.curSegment != null, "Can't write, no segment open ; journal id: " + this.journalId, new Object[0]);
        if (this.curSegmentTxId != segmentTxId) {
            JournalOutOfSyncException e = new JournalOutOfSyncException("Writer out of sync: it thinks it is writing segment " + segmentTxId + " but current segment is " + this.curSegmentTxId + " ; journal id: " + this.journalId);
            this.abortCurSegment();
            throw e;
        }
        this.checkSync(this.nextTxId == firstTxnId, "Can't write txid " + firstTxnId + " expecting nextTxId=" + this.nextTxId + " ; journal id: " + this.journalId, new Object[0]);
        long lastTxnId = firstTxnId + (long)numTxns - 1L;
        if (LOG.isTraceEnabled()) {
            LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId + " ; journal id: " + this.journalId);
        }
        if (this.cache != null) {
            this.cache.storeEdits(records, firstTxnId, lastTxnId, this.curSegmentLayoutVersion);
        }
        boolean isLagging = lastTxnId <= this.committedTxnId.get();
        boolean shouldFsync = !isLagging;
        this.curSegment.writeRaw(records, 0, records.length);
        this.curSegment.setReadyToFlush();
        StopWatch sw = new StopWatch();
        sw.start();
        this.curSegment.flush(shouldFsync);
        sw.stop();
        long nanoSeconds = sw.now();
        this.metrics.addSync(TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
        long milliSeconds = TimeUnit.MILLISECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS);
        if (milliSeconds > 1000L) {
            LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId + " took " + milliSeconds + "ms ; journal id: " + this.journalId);
        }
        if (isLagging) {
            this.metrics.batchesWrittenWhileLagging.incr(1L);
        }
        this.metrics.batchesWritten.incr(1L);
        this.metrics.bytesWritten.incr((long)records.length);
        this.metrics.txnsWritten.incr((long)numTxns);
        this.updateHighestWrittenTxId(lastTxnId);
        this.nextTxId = lastTxnId + 1L;
        this.lastJournalTimestamp = Time.now();
    }

    public void heartbeat(RequestInfo reqInfo) throws IOException {
        this.checkRequest(reqInfo);
    }

    private synchronized void checkRequest(RequestInfo reqInfo) throws IOException {
        if (reqInfo.getEpoch() < this.lastPromisedEpoch.get()) {
            throw new IOException("IPC's epoch " + reqInfo.getEpoch() + " is less than the last promised epoch " + this.lastPromisedEpoch.get() + " ; journal id: " + this.journalId);
        }
        if (reqInfo.getEpoch() > this.lastPromisedEpoch.get()) {
            this.updateLastPromisedEpoch(reqInfo.getEpoch());
        }
        this.checkSync(reqInfo.getIpcSerialNumber() > this.currentEpochIpcSerial, "IPC serial %s from client %s was not higher than prior highest IPC serial %s ; journal id: %s", reqInfo.getIpcSerialNumber(), Server.getRemoteIp(), this.currentEpochIpcSerial, this.journalId);
        this.currentEpochIpcSerial = reqInfo.getIpcSerialNumber();
        if (reqInfo.hasCommittedTxId()) {
            Preconditions.checkArgument((reqInfo.getCommittedTxId() >= this.committedTxnId.get() ? 1 : 0) != 0, (Object)("Client trying to move committed txid backward from " + this.committedTxnId.get() + " to " + reqInfo.getCommittedTxId() + " ; journal id: " + this.journalId));
            this.committedTxnId.set(reqInfo.getCommittedTxId());
        }
    }

    private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException {
        this.checkRequest(reqInfo);
        if (reqInfo.getEpoch() != this.lastWriterEpoch.get()) {
            throw new IOException("IPC's epoch " + reqInfo.getEpoch() + " is not the current writer epoch  " + this.lastWriterEpoch.get() + " ; journal id: " + this.journalId);
        }
    }

    public synchronized boolean isFormatted() {
        return this.storage.isFormatted();
    }

    private void checkFormatted() throws JournalNotFormattedException {
        if (!this.isFormatted()) {
            throw new JournalNotFormattedException("Journal " + this.storage.getSingularStorageDir() + " not formatted ; journal id: " + this.journalId);
        }
    }

    private void checkSync(boolean expression, String msg, Object ... formatArgs) throws JournalOutOfSyncException {
        if (!expression) {
            throw new JournalOutOfSyncException(String.format(msg, formatArgs));
        }
    }

    private void alwaysAssert(boolean expression, String msg, Object ... formatArgs) {
        if (!expression) {
            throw new AssertionError((Object)String.format(msg, formatArgs));
        }
    }

    public synchronized void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion) throws IOException {
        long curLastWriterEpoch;
        FileJournalManager.EditLogFile existing;
        assert (this.fjm != null);
        this.checkFormatted();
        this.checkRequest(reqInfo);
        if (this.curSegment != null) {
            LOG.warn("Client is requesting a new log segment " + txid + " though we are already writing " + this.curSegment + ". Aborting the current segment in order to begin the new one. ; journal id: " + this.journalId);
            this.abortCurSegment();
        }
        if ((existing = this.fjm.getLogFile(txid)) != null) {
            if (!existing.isInProgress()) {
                throw new IllegalStateException("Already have a finalized segment " + existing + " beginning at " + txid + " ; journal id: " + this.journalId);
            }
            existing.scanLog(Long.MAX_VALUE, false);
            if (existing.getLastTxId() != existing.getFirstTxId()) {
                throw new IllegalStateException("The log file " + existing + " seems to contain valid transactions ; journal id: " + this.journalId);
            }
        }
        if ((curLastWriterEpoch = this.lastWriterEpoch.get()) != reqInfo.getEpoch()) {
            LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch + " to " + reqInfo.getEpoch() + " for client " + Server.getRemoteIp() + " ; journal id: " + this.journalId);
            this.lastWriterEpoch.set(reqInfo.getEpoch());
        }
        this.purgePaxosDecision(txid);
        this.curSegment = this.fjm.startLogSegment(txid, layoutVersion);
        this.curSegmentTxId = txid;
        this.curSegmentLayoutVersion = layoutVersion;
        this.nextTxId = txid;
    }

    public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException {
        FileJournalManager.EditLogFile elf;
        this.checkFormatted();
        this.checkRequest(reqInfo);
        boolean needsValidation = true;
        if (startTxId == this.curSegmentTxId) {
            if (this.curSegment != null) {
                this.curSegment.close();
                this.curSegment = null;
                this.curSegmentTxId = -12345L;
                this.curSegmentLayoutVersion = 0;
            }
            this.checkSync(this.nextTxId == endTxId + 1L, "Trying to finalize in-progress log segment %s to end at txid %s but only written up to txid %s ; journal id: %s", startTxId, endTxId, this.nextTxId - 1L, this.journalId);
            needsValidation = false;
        }
        if ((elf = this.fjm.getLogFile(startTxId)) == null) {
            throw new JournalOutOfSyncException("No log file to finalize at transaction ID " + startTxId + " ; journal id: " + this.journalId);
        }
        if (elf.isInProgress()) {
            if (needsValidation) {
                LOG.info("Validating log segment " + elf.getFile() + " about to be finalized ; journal id: " + this.journalId);
                elf.scanLog(Long.MAX_VALUE, false);
                this.checkSync(elf.getLastTxId() == endTxId, "Trying to finalize in-progress log segment %s to end at txid %s but log %s on disk only contains up to txid %s ; journal id: %s", startTxId, endTxId, elf.getFile(), elf.getLastTxId(), this.journalId);
            }
            this.fjm.finalizeLogSegment(startTxId, endTxId);
        } else {
            Preconditions.checkArgument((endTxId == elf.getLastTxId() ? 1 : 0) != 0, (Object)("Trying to re-finalize already finalized log " + elf + " with different endTxId " + endTxId + " ; journal id: " + this.journalId));
        }
        this.purgePaxosDecision(elf.getFirstTxId());
    }

    public synchronized void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) throws IOException {
        this.checkFormatted();
        this.checkRequest(reqInfo);
        this.storage.purgeDataOlderThan(minTxIdToKeep);
    }

    private void purgePaxosDecision(long segmentTxId) throws IOException {
        File paxosFile = this.storage.getPaxosFile(segmentTxId);
        if (paxosFile.exists() && !paxosFile.delete()) {
            throw new IOException("Unable to delete paxos file " + paxosFile + " ; journal id: " + this.journalId);
        }
    }

    public RemoteEditLogManifest getEditLogManifest(long sinceTxId, boolean inProgressOk) throws IOException {
        this.checkFormatted();
        List<RemoteEditLog> logs = this.fjm.getRemoteEditLogs(sinceTxId, inProgressOk);
        if (inProgressOk) {
            RemoteEditLog log = null;
            Iterator<RemoteEditLog> iter = logs.iterator();
            while (iter.hasNext()) {
                log = iter.next();
                if (!log.isInProgress()) continue;
                iter.remove();
                break;
            }
            if (log != null && log.isInProgress()) {
                logs.add(new RemoteEditLog(log.getStartTxId(), this.getHighestWrittenTxId(), true));
            }
        }
        return new RemoteEditLogManifest(logs, this.getCommittedTxnId());
    }

    public QJournalProtocolProtos.GetJournaledEditsResponseProto getJournaledEdits(long sinceTxId, int maxTxns) throws IOException {
        if (this.cache == null) {
            throw new IOException("The journal edits cache is not enabled, which is a requirement to fetch journaled edits via RPC. Please enable it via dfs.ha.tail-edits.in-progress");
        }
        if (sinceTxId > this.getHighestWrittenTxId()) {
            this.metrics.rpcEmptyResponses.incr();
            return QJournalProtocolProtos.GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
        }
        try {
            ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
            int txnCount = this.cache.retrieveEdits(sinceTxId, maxTxns, buffers);
            int totalSize = 0;
            for (ByteBuffer buf : buffers) {
                totalSize += buf.remaining();
            }
            this.metrics.txnsServedViaRpc.incr((long)txnCount);
            this.metrics.bytesServedViaRpc.incr((long)totalSize);
            ByteString.Output output = ByteString.newOutput((int)totalSize);
            for (ByteBuffer buf : buffers) {
                output.write(buf.array(), buf.position(), buf.remaining());
            }
            return QJournalProtocolProtos.GetJournaledEditsResponseProto.newBuilder().setTxnCount(txnCount).setEditLog(output.toByteString()).build();
        }
        catch (JournaledEditsCache.CacheMissException cme) {
            this.metrics.rpcRequestCacheMissAmount.add(cme.getCacheMissAmount());
            throw cme;
        }
    }

    @VisibleForTesting
    QJournalProtocolProtos.SegmentStateProto getSegmentInfo(long segmentTxId) throws IOException {
        FileJournalManager.EditLogFile elf = this.fjm.getLogFile(segmentTxId);
        if (elf == null) {
            return null;
        }
        if (elf.isInProgress()) {
            elf.scanLog(Long.MAX_VALUE, false);
        }
        if (elf.getLastTxId() == -12345L) {
            LOG.info("Edit log file " + elf + " appears to be empty. Moving it aside... ; journal id: " + this.journalId);
            elf.moveAsideEmptyFile();
            return null;
        }
        QJournalProtocolProtos.SegmentStateProto ret = QJournalProtocolProtos.SegmentStateProto.newBuilder().setStartTxId(segmentTxId).setEndTxId(elf.getLastTxId()).setIsInProgress(elf.isInProgress()).build();
        LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + TextFormat.shortDebugString((MessageOrBuilder)ret) + " ; journal id: " + this.journalId);
        return ret;
    }

    public synchronized QJournalProtocolProtos.PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo, long segmentTxId) throws IOException {
        boolean hasFinalizedSegment;
        this.checkFormatted();
        this.checkRequest(reqInfo);
        this.abortCurSegment();
        QJournalProtocolProtos.PrepareRecoveryResponseProto.Builder builder = QJournalProtocolProtos.PrepareRecoveryResponseProto.newBuilder();
        QJournalProtocolProtos.PersistedRecoveryPaxosData previouslyAccepted = this.getPersistedPaxosData(segmentTxId);
        this.completeHalfDoneAcceptRecovery(previouslyAccepted);
        QJournalProtocolProtos.SegmentStateProto segInfo = this.getSegmentInfo(segmentTxId);
        boolean bl = hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress();
        if (previouslyAccepted != null && !hasFinalizedSegment) {
            QJournalProtocolProtos.SegmentStateProto acceptedState = previouslyAccepted.getSegmentState();
            assert (acceptedState.getEndTxId() == segInfo.getEndTxId()) : "prev accepted: " + TextFormat.shortDebugString((MessageOrBuilder)previouslyAccepted) + "\non disk:       " + TextFormat.shortDebugString((MessageOrBuilder)segInfo);
            builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch()).setSegmentState(previouslyAccepted.getSegmentState());
        } else if (segInfo != null) {
            builder.setSegmentState(segInfo);
        }
        builder.setLastWriterEpoch(this.lastWriterEpoch.get());
        if (this.committedTxnId.get() != -12345L) {
            builder.setLastCommittedTxId(this.committedTxnId.get());
        }
        QJournalProtocolProtos.PrepareRecoveryResponseProto resp = builder.build();
        LOG.info("Prepared recovery for segment " + segmentTxId + ": " + TextFormat.shortDebugString((MessageOrBuilder)resp) + " ; journal id: " + this.journalId);
        return resp;
    }

    public synchronized void acceptRecovery(RequestInfo reqInfo, QJournalProtocolProtos.SegmentStateProto segment, URL fromUrl) throws IOException {
        this.checkFormatted();
        this.checkRequest(reqInfo);
        this.abortCurSegment();
        long segmentTxId = segment.getStartTxId();
        Preconditions.checkArgument((segment.getEndTxId() > 0L && segment.getEndTxId() >= segmentTxId ? 1 : 0) != 0, (String)"bad recovery state for segment %s: %s ; journal id: %s", (Object)segmentTxId, (Object)TextFormat.shortDebugString((MessageOrBuilder)segment), (Object)this.journalId);
        QJournalProtocolProtos.PersistedRecoveryPaxosData oldData = this.getPersistedPaxosData(segmentTxId);
        QJournalProtocolProtos.PersistedRecoveryPaxosData newData = QJournalProtocolProtos.PersistedRecoveryPaxosData.newBuilder().setAcceptedInEpoch(reqInfo.getEpoch()).setSegmentState(segment).build();
        if (oldData != null) {
            this.alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(), "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\nJournalId: %s\n", oldData, newData, this.journalId);
        }
        File syncedFile = null;
        QJournalProtocolProtos.SegmentStateProto currentSegment = this.getSegmentInfo(segmentTxId);
        if (currentSegment == null || currentSegment.getEndTxId() != segment.getEndTxId()) {
            if (currentSegment == null) {
                LOG.info("Synchronizing log " + TextFormat.shortDebugString((MessageOrBuilder)segment) + ": no current segment in place ; journal id: " + this.journalId);
                this.updateHighestWrittenTxId(Math.max(segment.getEndTxId(), this.highestWrittenTxId));
            } else {
                LOG.info("Synchronizing log " + TextFormat.shortDebugString((MessageOrBuilder)segment) + ": old segment " + TextFormat.shortDebugString((MessageOrBuilder)currentSegment) + " is not the right length ; journal id: " + this.journalId);
                if (this.txnRange(currentSegment).contains((Object)this.committedTxnId.get()) && !this.txnRange(segment).contains((Object)this.committedTxnId.get())) {
                    throw new AssertionError((Object)("Cannot replace segment " + TextFormat.shortDebugString((MessageOrBuilder)currentSegment) + " with new segment " + TextFormat.shortDebugString((MessageOrBuilder)segment) + ": would discard already-committed txn " + this.committedTxnId.get() + " ; journal id: " + this.journalId));
                }
                this.alwaysAssert(currentSegment.getIsInProgress(), "Should never be asked to synchronize a different log on top of an already-finalized segment ; journal id: " + this.journalId, new Object[0]);
                if (this.txnRange(currentSegment).contains((Object)this.highestWrittenTxId)) {
                    this.updateHighestWrittenTxId(segment.getEndTxId());
                }
            }
            syncedFile = this.syncLog(reqInfo, segment, fromUrl);
        } else {
            LOG.info("Skipping download of log " + TextFormat.shortDebugString((MessageOrBuilder)segment) + ": already have up-to-date logs ; journal id: " + this.journalId);
        }
        JournalFaultInjector.get().beforePersistPaxosData();
        this.persistPaxosData(segmentTxId, newData);
        JournalFaultInjector.get().afterPersistPaxosData();
        if (syncedFile != null) {
            FileUtil.replaceFile((File)syncedFile, (File)this.storage.getInProgressEditLog(segmentTxId));
        }
        LOG.info("Accepted recovery for segment " + segmentTxId + ": " + TextFormat.shortDebugString((MessageOrBuilder)newData) + " ; journal id: " + this.journalId);
    }

    private Range<Long> txnRange(QJournalProtocolProtos.SegmentStateProto seg) {
        Preconditions.checkArgument((boolean)seg.hasEndTxId(), (String)"invalid segment: %s ; journal id: %s", (Object)seg, (Object)this.journalId);
        return Range.between((Comparable)Long.valueOf(seg.getStartTxId()), (Comparable)Long.valueOf(seg.getEndTxId()));
    }

    private File syncLog(RequestInfo reqInfo, QJournalProtocolProtos.SegmentStateProto segment, final URL url) throws IOException {
        File tmpFile = this.storage.getSyncLogTemporaryFile(segment.getStartTxId(), reqInfo.getEpoch());
        ImmutableList localPaths = ImmutableList.of((Object)tmpFile);
        LOG.info("Synchronizing log " + TextFormat.shortDebugString((MessageOrBuilder)segment) + " from " + url);
        SecurityUtil.doAsLoginUser((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>((List)localPaths, tmpFile){
            final /* synthetic */ List val$localPaths;
            final /* synthetic */ File val$tmpFile;
            {
                this.val$localPaths = list;
                this.val$tmpFile = file;
            }

            @Override
            public Void run() throws IOException {
                if (UserGroupInformation.isSecurityEnabled()) {
                    UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
                }
                boolean success = false;
                try {
                    TransferFsImage.doGetUrl(url, this.val$localPaths, Journal.this.storage, true);
                    assert (this.val$tmpFile.exists());
                    success = true;
                }
                finally {
                    if (!success && !this.val$tmpFile.delete()) {
                        LOG.warn("Failed to delete temporary file " + this.val$tmpFile);
                    }
                }
                return null;
            }
        });
        return tmpFile;
    }

    private void completeHalfDoneAcceptRecovery(QJournalProtocolProtos.PersistedRecoveryPaxosData paxosData) throws IOException {
        long epoch;
        if (paxosData == null) {
            return;
        }
        long segmentId = paxosData.getSegmentState().getStartTxId();
        File tmp = this.storage.getSyncLogTemporaryFile(segmentId, epoch = paxosData.getAcceptedInEpoch());
        if (tmp.exists()) {
            File dst = this.storage.getInProgressEditLog(segmentId);
            LOG.info("Rolling forward previously half-completed synchronization: " + tmp + " -> " + dst + " ; journal id: " + this.journalId);
            FileUtil.replaceFile((File)tmp, (File)dst);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private QJournalProtocolProtos.PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId) throws IOException {
        File f = this.storage.getPaxosFile(segmentTxId);
        if (!f.exists()) {
            return null;
        }
        FileInputStream in = new FileInputStream(f);
        try {
            QJournalProtocolProtos.PersistedRecoveryPaxosData ret = QJournalProtocolProtos.PersistedRecoveryPaxosData.parseDelimitedFrom(in);
            Preconditions.checkState((ret != null && ret.getSegmentState().getStartTxId() == segmentTxId ? 1 : 0) != 0, (String)"Bad persisted data for segment %s: %s ; journal id: %s", (Object)segmentTxId, (Object)ret, (Object)this.journalId);
            QJournalProtocolProtos.PersistedRecoveryPaxosData persistedRecoveryPaxosData = ret;
            return persistedRecoveryPaxosData;
        }
        finally {
            IOUtils.closeStream((Closeable)in);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void persistPaxosData(long segmentTxId, QJournalProtocolProtos.PersistedRecoveryPaxosData newData) throws IOException {
        File f = this.storage.getPaxosFile(segmentTxId);
        boolean success = false;
        AtomicFileOutputStream fos = new AtomicFileOutputStream(f);
        try {
            newData.writeDelimitedTo(fos);
            fos.write(10);
            OutputStreamWriter writer = new OutputStreamWriter((OutputStream)fos, Charsets.UTF_8);
            writer.write(String.valueOf(newData));
            writer.write(10);
            writer.flush();
            fos.flush();
            success = true;
        }
        finally {
            if (success) {
                IOUtils.closeStream((Closeable)fos);
            } else {
                fos.abort();
            }
        }
    }

    public synchronized void doPreUpgrade() throws IOException {
        IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{this.committedTxnId});
        this.storage.getJournalManager().doPreUpgrade();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void doUpgrade(StorageInfo sInfo) throws IOException {
        long oldCTime = this.storage.getCTime();
        this.storage.cTime = sInfo.cTime;
        int oldLV = this.storage.getLayoutVersion();
        this.storage.layoutVersion = sInfo.layoutVersion;
        LOG.info("Starting upgrade of edits directory: " + this.storage.getRoot() + ".\n   old LV = " + oldLV + "; old CTime = " + oldCTime + ".\n   new LV = " + this.storage.getLayoutVersion() + "; new CTime = " + this.storage.getCTime());
        this.storage.getJournalManager().doUpgrade(this.storage);
        this.storage.getOrCreatePaxosDir();
        File currentDir = this.storage.getSingularStorageDir().getCurrentDir();
        File previousDir = this.storage.getSingularStorageDir().getPreviousDir();
        PersistentLongFile prevLastPromisedEpoch = new PersistentLongFile(new File(previousDir, LAST_PROMISED_FILENAME), 0L);
        PersistentLongFile prevLastWriterEpoch = new PersistentLongFile(new File(previousDir, LAST_WRITER_EPOCH), 0L);
        BestEffortLongFile prevCommittedTxnId = new BestEffortLongFile(new File(previousDir, COMMITTED_TXID_FILENAME), -12345L);
        this.lastPromisedEpoch = new PersistentLongFile(new File(currentDir, LAST_PROMISED_FILENAME), 0L);
        this.lastWriterEpoch = new PersistentLongFile(new File(currentDir, LAST_WRITER_EPOCH), 0L);
        this.committedTxnId = new BestEffortLongFile(new File(currentDir, COMMITTED_TXID_FILENAME), -12345L);
        try {
            this.lastPromisedEpoch.set(prevLastPromisedEpoch.get());
            this.lastWriterEpoch.set(prevLastWriterEpoch.get());
            this.committedTxnId.set(prevCommittedTxnId.get());
        }
        catch (Throwable throwable) {
            IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{prevCommittedTxnId});
            throw throwable;
        }
        IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{prevCommittedTxnId});
    }

    public synchronized void doFinalize() throws IOException {
        LOG.info("Finalizing upgrade for journal " + this.storage.getRoot() + "." + (this.storage.getLayoutVersion() == 0 ? "" : "\n   cur LV = " + this.storage.getLayoutVersion() + "; cur CTime = " + this.storage.getCTime()));
        this.storage.getJournalManager().doFinalize();
    }

    public Boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
        return this.storage.getJournalManager().canRollBack(storage, prevStorage, targetLayoutVersion);
    }

    public synchronized void doRollback() throws IOException {
        IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{this.committedTxnId});
        this.storage.getJournalManager().doRollback();
    }

    synchronized void discardSegments(long startTxId) throws IOException {
        this.storage.getJournalManager().discardSegments(startTxId);
        this.committedTxnId.set(startTxId - 1L);
    }

    synchronized boolean moveTmpSegmentToCurrent(File tmpFile, File finalFile, long endTxId) throws IOException {
        boolean success;
        if (endTxId <= this.committedTxnId.get()) {
            if (!finalFile.getParentFile().exists()) {
                LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp segment move to current directory ; journal id: " + this.journalId);
                return false;
            }
            Files.move(tmpFile.toPath(), finalFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
            if (finalFile.exists() && FileUtil.canRead((File)finalFile)) {
                success = true;
            } else {
                success = false;
                LOG.warn("Unable to move edits file from " + tmpFile + " to " + finalFile + " ; journal id: " + this.journalId);
            }
        } else {
            success = false;
            LOG.error("The endTxId of the temporary file is not less than the last committed transaction id. Aborting move to final file" + finalFile + " ; journal id: " + this.journalId);
        }
        return success;
    }

    public Long getJournalCTime() throws IOException {
        return this.storage.getJournalManager().getJournalCTime();
    }

    @VisibleForTesting
    JournaledEditsCache getJournaledEditsCache() {
        return this.cache;
    }
}

