/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.wal;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class WALSplitter {
    private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
    public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
    protected final Path walDir;
    protected final FileSystem fs;
    protected final Configuration conf;
    OutputSink outputSink;
    private EntryBuffers entryBuffers;
    private SplitLogWorkerCoordination splitLogWorkerCoordination;
    private final WALFactory walFactory;
    private MonitoredTask status;
    protected final LastSequenceId sequenceIdChecker;
    protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
    protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<String, Map<byte[], Long>>();
    private FileStatus fileBeingSplit;
    private final boolean splitWriterCreationBounded;
    public static final String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
    private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
    private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
    private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
    private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
    private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = ".seqid".length();

    @VisibleForTesting
    WALSplitter(WALFactory factory, Configuration conf, Path walDir, FileSystem fs, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) {
        this.conf = HBaseConfiguration.create((Configuration)conf);
        String codecClassName = conf.get("hbase.regionserver.wal.codec", WALCellCodec.class.getName());
        this.conf.set("hbase.client.rpc.codec", codecClassName);
        this.walDir = walDir;
        this.fs = fs;
        this.sequenceIdChecker = idChecker;
        this.splitLogWorkerCoordination = splitLogWorkerCoordination;
        this.walFactory = factory;
        PipelineController controller = new PipelineController();
        this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
        this.entryBuffers = new EntryBuffers(controller, this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 0x8000000), this.splitWriterCreationBounded);
        int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
        this.outputSink = this.splitWriterCreationBounded ? new BoundedLogWriterCreationOutputSink(controller, this.entryBuffers, numWriterThreads) : new LogRecoveredEditsOutputSink(controller, this.entryBuffers, numWriterThreads);
    }

    public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem fs, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory) throws IOException {
        WALSplitter s = new WALSplitter(factory, conf, walDir, fs, idChecker, splitLogWorkerCoordination);
        return s.splitLogFile(logfile, reporter);
    }

    @VisibleForTesting
    public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, FileSystem fs, Configuration conf, WALFactory factory) throws IOException {
        Object[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null);
        ArrayList<Path> splits = new ArrayList<Path>();
        if (ArrayUtils.isNotEmpty((Object[])logfiles)) {
            Object[] objectArray = logfiles;
            int n = objectArray.length;
            for (int i = 0; i < n; ++i) {
                WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null);
                Object logfile = objectArray[i];
                if (!s.splitLogFile((FileStatus)logfile, null)) continue;
                WALSplitter.finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
                if (s.outputSink.splits == null) continue;
                splits.addAll(s.outputSink.splits);
            }
        }
        if (!fs.delete(logDir, true)) {
            throw new IOException("Unable to delete src dir: " + logDir);
        }
        return splits;
    }

    /*
     * Exception decompiling
     */
    @VisibleForTesting
    boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [6[TRYBLOCK]], but top level block is 40[WHILELOOP]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
        Path rootdir = FSUtils.getWALRootDir((Configuration)conf);
        Path oldLogDir = new Path(rootdir, "oldWALs");
        Path logPath = FSUtils.isStartingWithPath((Path)rootdir, (String)logfile) ? new Path(logfile) : new Path(rootdir, logfile);
        WALSplitter.finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
    }

    private static void finishSplitLogFile(Path rootdir, Path oldLogDir, Path logPath, Configuration conf) throws IOException {
        ArrayList<Path> processedLogs = new ArrayList<Path>();
        ArrayList<Path> corruptedLogs = new ArrayList<Path>();
        FileSystem fs = rootdir.getFileSystem(conf);
        if (ZKSplitLog.isCorrupted((Path)rootdir, (String)logPath.getName(), (FileSystem)fs)) {
            corruptedLogs.add(logPath);
        } else {
            processedLogs.add(logPath);
        }
        WALSplitter.archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
        Path stagingDir = ZKSplitLog.getSplitLogDir((Path)rootdir, (String)logPath.getName());
        fs.delete(stagingDir, true);
    }

    private static void archiveLogs(List<Path> corruptedLogs, List<Path> processedLogs, Path oldLogDir, FileSystem fs, Configuration conf) throws IOException {
        Path corruptDir = new Path(FSUtils.getWALRootDir((Configuration)conf), "corrupt");
        if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
            LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", (Object)corruptDir);
        }
        if (!fs.mkdirs(corruptDir)) {
            LOG.info("Unable to mkdir {}", (Object)corruptDir);
        }
        fs.mkdirs(oldLogDir);
        for (Path corrupted : corruptedLogs) {
            Path p = new Path(corruptDir, corrupted.getName());
            if (!fs.exists(corrupted)) continue;
            if (!fs.rename(corrupted, p)) {
                LOG.warn("Unable to move corrupted log {} to {}", (Object)corrupted, (Object)p);
                continue;
            }
            LOG.warn("Moved corrupted log {} to {}", (Object)corrupted, (Object)p);
        }
        for (Path p : processedLogs) {
            Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
            if (!fs.exists(p)) continue;
            if (!FSUtils.renameAndSetModifyTime((FileSystem)fs, (Path)p, (Path)newPath)) {
                LOG.warn("Unable to move {} to {}", (Object)p, (Object)newPath);
                continue;
            }
            LOG.info("Archived processed log {} to {}", (Object)p, (Object)newPath);
        }
    }

    @VisibleForTesting
    static Path getRegionSplitEditsPath(WAL.Entry logEntry, String fileNameBeingSplit, Configuration conf) throws IOException {
        FileSystem fs = FileSystem.get((Configuration)conf);
        Path rootDir = FSUtils.getRootDir((Configuration)conf);
        Path tableDir = FSUtils.getTableDir((Path)rootDir, (TableName)logEntry.getKey().getTableName());
        String encodedRegionName = Bytes.toString((byte[])logEntry.getKey().getEncodedRegionName());
        Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
        Path dir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
        if (!fs.exists(regiondir)) {
            LOG.info("This region's directory does not exist: {}.It is very likely that it was already split so it is safe to discard those edits.", (Object)regiondir);
            return null;
        }
        if (fs.exists(dir) && fs.isFile(dir)) {
            Path tmp = new Path("/tmp");
            if (!fs.exists(tmp)) {
                fs.mkdirs(tmp);
            }
            tmp = new Path(tmp, "recovered.edits_" + encodedRegionName);
            LOG.warn("Found existing old file: {}. It could be some leftover of an old installation. It should be a folder instead. So moving it to {}", (Object)dir, (Object)tmp);
            if (!fs.rename(dir, tmp)) {
                LOG.warn("Failed to sideline old file {}", (Object)dir);
            }
        }
        if (!fs.exists(dir) && !fs.mkdirs(dir)) {
            LOG.warn("mkdir failed on {}", (Object)dir);
        }
        String fileName = WALSplitter.formatRecoveredEditsFileName(logEntry.getKey().getSequenceId());
        fileName = WALSplitter.getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
        return new Path(dir, fileName);
    }

    private static String getTmpRecoveredEditsFileName(String fileName) {
        return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
    }

    private static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditLogSeqNum) {
        String fileName = WALSplitter.formatRecoveredEditsFileName(maximumEditLogSeqNum);
        return new Path(srcPath.getParent(), fileName);
    }

    @VisibleForTesting
    static String formatRecoveredEditsFileName(long seqid) {
        return String.format("%019d", seqid);
    }

    public static Path getRegionDirRecoveredEditsDir(Path regiondir) {
        return new Path(regiondir, "recovered.edits");
    }

    public static boolean hasRecoveredEdits(FileSystem fs, Configuration conf, RegionInfo regionInfo) throws IOException {
        if (regionInfo.getReplicaId() != 0) {
            return false;
        }
        Path rootDir = FSUtils.getRootDir((Configuration)conf);
        Path regionDir = HRegion.getRegionDir(rootDir, regionInfo);
        NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regionDir);
        return files != null && !files.isEmpty();
    }

    public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs, Path regiondir) throws IOException {
        TreeSet<Path> filesSorted = new TreeSet<Path>();
        Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
        if (!fs.exists(editsdir)) {
            return filesSorted;
        }
        Object[] files = FSUtils.listStatus((FileSystem)fs, (Path)editsdir, (PathFilter)new PathFilter(){

            public boolean accept(Path p) {
                boolean result = false;
                try {
                    Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
                    boolean bl = result = fs.isFile(p) && m.matches();
                    if (p.getName().endsWith(WALSplitter.RECOVERED_LOG_TMPFILE_SUFFIX)) {
                        result = false;
                    }
                    if (WALSplitter.isSequenceIdFile(p)) {
                        result = false;
                    }
                }
                catch (IOException e) {
                    LOG.warn("Failed isFile check on {}", (Object)p, (Object)e);
                }
                return result;
            }
        });
        if (ArrayUtils.isNotEmpty((Object[])files)) {
            Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
        }
        return filesSorted;
    }

    public static Path moveAsideBadEditsFile(FileSystem fs, Path edits) throws IOException {
        Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis());
        if (!fs.rename(edits, moveAsideName)) {
            LOG.warn("Rename failed from {} to {}", (Object)edits, (Object)moveAsideName);
        }
        return moveAsideName;
    }

    @VisibleForTesting
    public static boolean isSequenceIdFile(Path file) {
        return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
    }

    private static FileStatus[] getSequenceIdFiles(FileSystem fs, Path regionDir) throws IOException {
        Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
        try {
            FileStatus[] files = fs.listStatus(editsDir, WALSplitter::isSequenceIdFile);
            return files != null ? files : new FileStatus[]{};
        }
        catch (FileNotFoundException e) {
            return new FileStatus[0];
        }
    }

    private static long getMaxSequenceId(FileStatus[] files) {
        long maxSeqId = -1L;
        for (FileStatus file : files) {
            String fileName = file.getPath().getName();
            try {
                maxSeqId = Math.max(maxSeqId, Long.parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
            }
            catch (NumberFormatException ex) {
                LOG.warn("Invalid SeqId File Name={}", (Object)fileName);
            }
        }
        return maxSeqId;
    }

    public static long getMaxRegionSequenceId(FileSystem fs, Path regionDir) throws IOException {
        return WALSplitter.getMaxSequenceId(WALSplitter.getSequenceIdFiles(fs, regionDir));
    }

    public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long newMaxSeqId) throws IOException {
        FileStatus[] files = WALSplitter.getSequenceIdFiles(fs, regionDir);
        long maxSeqId = WALSplitter.getMaxSequenceId(files);
        if (maxSeqId > newMaxSeqId) {
            throw new IOException("The new max sequence id " + newMaxSeqId + " is less than the old max sequence id " + maxSeqId);
        }
        Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
        if (newMaxSeqId != maxSeqId) {
            try {
                if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
                    throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
                }
                LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", new Object[]{newSeqIdFile, newMaxSeqId, maxSeqId});
            }
            catch (FileAlreadyExistsException fileAlreadyExistsException) {
                // empty catch block
            }
        }
        for (FileStatus status : files) {
            if (newSeqIdFile.equals((Object)status.getPath())) continue;
            fs.delete(status.getPath(), false);
        }
    }

    protected WAL.Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
        WAL.Reader in;
        Path path = file.getPath();
        long length = file.getLen();
        if (length <= 0L) {
            LOG.warn("File {} might be still open, length is 0", (Object)path);
        }
        try {
            FSUtils.getInstance(this.fs, this.conf).recoverFileLease(this.fs, path, this.conf, reporter);
            try {
                in = this.getReader(path, reporter);
            }
            catch (EOFException e) {
                if (length <= 0L) {
                    LOG.warn("Could not open {} for reading. File is empty", (Object)path, (Object)e);
                }
                return null;
            }
        }
        catch (IOException e) {
            if (e instanceof FileNotFoundException) {
                LOG.warn("File {} does not exist anymore", (Object)path, (Object)e);
                return null;
            }
            if (!skipErrors || e instanceof InterruptedIOException) {
                throw e;
            }
            CorruptedLogFileException t = new CorruptedLogFileException("skipErrors=true Could not open wal " + path + " ignoring");
            t.initCause(e);
            throw t;
        }
        return in;
    }

    private static WAL.Entry getNextLogLine(WAL.Reader in, Path path, boolean skipErrors) throws CorruptedLogFileException, IOException {
        try {
            return in.next();
        }
        catch (EOFException eof) {
            LOG.info("EOF from wal {}. Continuing.", (Object)path);
            return null;
        }
        catch (IOException e) {
            if (e.getCause() != null && (e.getCause() instanceof ParseException || e.getCause() instanceof ChecksumException)) {
                LOG.warn("Parse exception from wal {}. Continuing", (Object)path, (Object)e);
                return null;
            }
            if (!skipErrors) {
                throw e;
            }
            CorruptedLogFileException t = new CorruptedLogFileException("skipErrors=true Ignoring exception while parsing wal " + path + ". Marking as corrupted");
            t.initCause(e);
            throw t;
        }
    }

    protected WALProvider.Writer createWriter(Path logfile) throws IOException {
        return this.walFactory.createRecoveredEditsWriter(this.fs, logfile);
    }

    protected WAL.Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
        return this.walFactory.createReader(this.fs, curLogFile, reporter);
    }

    private int getNumOpenWriters() {
        int result = 0;
        if (this.outputSink != null) {
            result += this.outputSink.getNumOpenWriters();
        }
        return result;
    }

    public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry, CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
        if (entry == null) {
            return Collections.emptyList();
        }
        long replaySeqId = entry.getKey().hasOrigSequenceNumber() ? entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
        int count = entry.getAssociatedCellCount();
        ArrayList<MutationReplay> mutations = new ArrayList<MutationReplay>();
        Cell previousCell = null;
        Delete m = null;
        WALKeyImpl key = null;
        WALEdit val = null;
        if (logEntry != null) {
            val = new WALEdit();
        }
        for (int i = 0; i < count; ++i) {
            boolean isNewRowOrType;
            if (!cells.advance()) {
                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
            }
            Cell cell = cells.current();
            if (val != null) {
                val.add(cell);
            }
            boolean bl = isNewRowOrType = previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || !CellUtil.matchingRows(previousCell, (Cell)cell);
            if (isNewRowOrType) {
                if (CellUtil.isDelete((Cell)cell)) {
                    m = new Delete(cell.getRowArray(), cell.getRowOffset(), (int)cell.getRowLength());
                    mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, (Mutation)m, 0L, 0L));
                } else {
                    m = new Put(cell.getRowArray(), cell.getRowOffset(), (int)cell.getRowLength());
                    long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : 0L;
                    long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : 0L;
                    mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, (Mutation)m, nonceGroup, nonce));
                }
            }
            if (CellUtil.isDelete((Cell)cell)) {
                ((Delete)m).add(cell);
            } else {
                ((Put)m).add(cell);
            }
            m.setDurability(durability);
            previousCell = cell;
        }
        if (logEntry != null) {
            WALProtos.WALKey walKeyProto = entry.getKey();
            ArrayList<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount());
            for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
                clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
            }
            key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf((byte[])walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
            logEntry.setFirst((Object)key);
            logEntry.setSecond((Object)val);
        }
        return mutations;
    }

    public static class MutationReplay
    implements Comparable<MutationReplay> {
        public final ClientProtos.MutationProto.MutationType type;
        public final Mutation mutation;
        public final long nonceGroup;
        public final long nonce;

        public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation, long nonceGroup, long nonce) {
            this.type = type;
            this.mutation = mutation;
            if (this.mutation.getDurability() != Durability.SKIP_WAL) {
                this.mutation.setDurability(Durability.ASYNC_WAL);
            }
            this.nonceGroup = nonceGroup;
            this.nonce = nonce;
        }

        @Override
        public int compareTo(MutationReplay d) {
            return this.mutation.compareTo((Row)d.mutation);
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof MutationReplay)) {
                return false;
            }
            return this.compareTo((MutationReplay)obj) == 0;
        }

        public int hashCode() {
            return this.mutation.hashCode();
        }
    }

    static class CorruptedLogFileException
    extends Exception {
        private static final long serialVersionUID = 1L;

        CorruptedLogFileException(String s) {
            super(s);
        }
    }

    private static final class WriterAndPath
    extends SinkWriter {
        final Path p;
        final WALProvider.Writer w;
        final long minLogSeqNum;

        WriterAndPath(Path p, WALProvider.Writer w, long minLogSeqNum) {
            this.p = p;
            this.w = w;
            this.minLogSeqNum = minLogSeqNum;
        }
    }

    public static abstract class SinkWriter {
        long editsWritten = 0L;
        long editsSkipped = 0L;
        long nanosSpent = 0L;

        void incrementEdits(int edits) {
            this.editsWritten += (long)edits;
        }

        void incrementSkippedEdits(int skipped) {
            this.editsSkipped += (long)skipped;
        }

        void incrementNanoTime(long nanos) {
            this.nanosSpent += nanos;
        }
    }

    class BoundedLogWriterCreationOutputSink
    extends LogRecoveredEditsOutputSink {
        private ConcurrentHashMap<String, Long> regionRecoverStatMap;

        public BoundedLogWriterCreationOutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
            super(controller, entryBuffers, numWriters);
            this.regionRecoverStatMap = new ConcurrentHashMap();
        }

        @Override
        public List<Path> finishWritingAndClose() throws IOException {
            List<Path> result;
            boolean isSuccessful;
            try {
                isSuccessful = this.finishWriting(false);
            }
            finally {
                result = this.close();
            }
            if (isSuccessful) {
                this.splits = result;
            }
            return this.splits;
        }

        @Override
        boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown, final List<Path> paths) throws InterruptedException, ExecutionException {
            for (final Map.Entry<byte[], RegionEntryBuffer> buffer : this.entryBuffers.buffers.entrySet()) {
                LOG.info("Submitting writeThenClose of {}", (Object)Arrays.toString(buffer.getValue().encodedRegionName));
                completionService.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        Path dst = BoundedLogWriterCreationOutputSink.this.writeThenClose((RegionEntryBuffer)buffer.getValue());
                        paths.add(dst);
                        return null;
                    }
                });
            }
            boolean progress_failed = false;
            int n = this.entryBuffers.buffers.size();
            for (int i = 0; i < n; ++i) {
                Future<Void> future = completionService.take();
                future.get();
                if (progress_failed || this.reporter == null || this.reporter.progress()) continue;
                progress_failed = true;
            }
            return progress_failed;
        }

        @Override
        public Map<byte[], Long> getOutputCounts() {
            HashMap<byte[], Long> regionRecoverStatMapResult = new HashMap<byte[], Long>();
            for (Map.Entry<String, Long> entry : this.regionRecoverStatMap.entrySet()) {
                regionRecoverStatMapResult.put(Bytes.toBytes((String)entry.getKey()), entry.getValue());
            }
            return regionRecoverStatMapResult;
        }

        @Override
        public int getNumberOfRecoveredRegions() {
            return this.regionRecoverStatMap.size();
        }

        @Override
        public void append(RegionEntryBuffer buffer) throws IOException {
            this.writeThenClose(buffer);
        }

        private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
            String encodedRegionName;
            Long value;
            WriterAndPath wap = this.appendBuffer(buffer, false);
            if (wap != null && (value = this.regionRecoverStatMap.putIfAbsent(encodedRegionName = Bytes.toString((byte[])buffer.encodedRegionName), wap.editsWritten)) != null) {
                Long newValue = this.regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
                this.regionRecoverStatMap.put(encodedRegionName, newValue);
            }
            Path dst = null;
            ArrayList<IOException> thrown = new ArrayList<IOException>();
            if (wap != null) {
                dst = this.closeWriter(Bytes.toString((byte[])buffer.encodedRegionName), wap, thrown);
            }
            if (!thrown.isEmpty()) {
                throw MultipleIOException.createIOException(thrown);
            }
            return dst;
        }
    }

    class LogRecoveredEditsOutputSink
    extends OutputSink {
        public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
            super(controller, entryBuffers, numWriters);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public List<Path> finishWritingAndClose() throws IOException {
            boolean isSuccessful = false;
            List<Path> result = null;
            try {
                isSuccessful = this.finishWriting(false);
            }
            finally {
                result = this.close();
                List<IOException> thrown = this.closeLogWriters(null);
                if (CollectionUtils.isNotEmpty(thrown)) {
                    throw MultipleIOException.createIOException(thrown);
                }
            }
            if (isSuccessful) {
                this.splits = result;
            }
            return this.splits;
        }

        private void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath wap, Path dst) throws IOException {
            long dstMinLogSeqNum = -1L;
            try (WAL.Reader reader = WALSplitter.this.walFactory.createReader(WALSplitter.this.fs, dst);){
                WAL.Entry entry = reader.next();
                if (entry != null) {
                    dstMinLogSeqNum = entry.getKey().getSequenceId();
                }
            }
            catch (EOFException e) {
                LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", (Object)dst, (Object)e);
            }
            if (wap.minLogSeqNum < dstMinLogSeqNum) {
                LOG.warn("Found existing old edits file. It could be the result of a previous failed split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" + WALSplitter.this.fs.getFileStatus(dst).getLen());
                if (!WALSplitter.this.fs.delete(dst, false)) {
                    LOG.warn("Failed deleting of old {}", (Object)dst);
                    throw new IOException("Failed deleting of old " + dst);
                }
            } else {
                LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p + ", length=" + rootFs.getFileStatus(wap.p).getLen());
                if (!rootFs.delete(wap.p, false)) {
                    LOG.warn("Failed deleting of {}", (Object)wap.p);
                    throw new IOException("Failed deleting of " + wap.p);
                }
            }
        }

        List<Path> close() throws IOException {
            boolean progress_failed;
            Preconditions.checkState((!this.closeAndCleanCompleted ? 1 : 0) != 0);
            ArrayList<Path> paths = new ArrayList<Path>();
            ArrayList thrown = Lists.newArrayList();
            ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool((int)this.numThreads, (long)30L, (TimeUnit)TimeUnit.SECONDS, (ThreadFactory)new ThreadFactory(){
                private int count = 1;

                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "split-log-closeStream-" + this.count++);
                    return t;
                }
            });
            ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(closeThreadPool);
            try {
                progress_failed = this.executeCloseTask(completionService, thrown, paths);
            }
            catch (InterruptedException e) {
                InterruptedIOException iie = new InterruptedIOException();
                iie.initCause(e);
                throw iie;
            }
            catch (ExecutionException e) {
                throw new IOException(e.getCause());
            }
            finally {
                closeThreadPool.shutdownNow();
            }
            if (!thrown.isEmpty()) {
                throw MultipleIOException.createIOException((List)thrown);
            }
            this.writersClosed = true;
            this.closeAndCleanCompleted = true;
            if (progress_failed) {
                return null;
            }
            return paths;
        }

        boolean executeCloseTask(CompletionService<Void> completionService, final List<IOException> thrown, final List<Path> paths) throws InterruptedException, ExecutionException {
            for (final Map.Entry writersEntry : this.writers.entrySet()) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
                }
                completionService.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        WriterAndPath wap = (WriterAndPath)writersEntry.getValue();
                        Path dst = LogRecoveredEditsOutputSink.this.closeWriter((String)writersEntry.getKey(), wap, thrown);
                        paths.add(dst);
                        return null;
                    }
                });
            }
            boolean progress_failed = false;
            int n = this.writers.size();
            for (int i = 0; i < n; ++i) {
                Future<Void> future = completionService.take();
                future.get();
                if (progress_failed || this.reporter == null || this.reporter.progress()) continue;
                progress_failed = true;
            }
            return progress_failed;
        }

        Path closeWriter(String encodedRegionName, WriterAndPath wap, List<IOException> thrown) throws IOException {
            LOG.trace("Closing {}", (Object)wap.p);
            FileSystem rootFs = FileSystem.get((Configuration)WALSplitter.this.conf);
            try {
                wap.w.close();
            }
            catch (IOException ioe) {
                LOG.error("Couldn't close log at " + wap.p, (Throwable)ioe);
                thrown.add(ioe);
                return null;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits, skipped " + wap.editsSkipped + " edits in " + wap.nanosSpent / 1000L / 1000L + "ms");
            }
            if (wap.editsWritten == 0L) {
                if (rootFs.exists(wap.p) && !rootFs.delete(wap.p, false)) {
                    LOG.warn("Failed deleting empty {}", (Object)wap.p);
                    throw new IOException("Failed deleting empty  " + wap.p);
                }
                return null;
            }
            Path dst = WALSplitter.getCompletedRecoveredEditsFilePath(wap.p, (Long)this.regionMaximumEditLogSeqNum.get(encodedRegionName));
            try {
                if (!dst.equals((Object)wap.p) && rootFs.exists(dst)) {
                    this.deleteOneWithFewerEntries(rootFs, wap, dst);
                }
                if (rootFs.exists(wap.p)) {
                    if (!rootFs.rename(wap.p, dst)) {
                        throw new IOException("Failed renaming " + wap.p + " to " + dst);
                    }
                    LOG.info("Rename " + wap.p + " to " + dst);
                }
            }
            catch (IOException ioe) {
                LOG.error("Couldn't rename " + wap.p + " to " + dst, (Throwable)ioe);
                thrown.add(ioe);
                return null;
            }
            return dst;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
            WriterAndPath wap;
            if (this.writersClosed) {
                return thrown;
            }
            if (thrown == null) {
                thrown = Lists.newArrayList();
            }
            try {
                for (WriterThread t : this.writerThreads) {
                    while (t.isAlive()) {
                        t.shouldStop = true;
                        t.interrupt();
                        try {
                            t.join(10L);
                        }
                        catch (InterruptedException e) {
                            InterruptedIOException iie = new InterruptedIOException();
                            iie.initCause(e);
                            throw iie;
                        }
                    }
                }
                wap = null;
            }
            catch (Throwable throwable) {
                WriterAndPath wap2 = null;
                for (SinkWriter tmpWAP : this.writers.values()) {
                    try {
                        wap2 = (WriterAndPath)tmpWAP;
                        wap2.w.close();
                    }
                    catch (IOException ioe) {
                        LOG.error("Couldn't close log at " + wap2.p, (Throwable)ioe);
                        thrown.add(ioe);
                        continue;
                    }
                    LOG.info("Closed log " + wap2.p + " (wrote " + wap2.editsWritten + " edits in " + wap2.nanosSpent / 1000L / 1000L + "ms)");
                }
                this.writersClosed = true;
                throw throwable;
            }
            for (SinkWriter tmpWAP : this.writers.values()) {
                try {
                    wap = (WriterAndPath)tmpWAP;
                    wap.w.close();
                }
                catch (IOException ioe) {
                    LOG.error("Couldn't close log at " + wap.p, (Throwable)ioe);
                    thrown.add(ioe);
                    continue;
                }
                LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + wap.nanosSpent / 1000L / 1000L + "ms)");
            }
            this.writersClosed = true;
            return thrown;
        }

        WriterAndPath getWriterAndPath(WAL.Entry entry, boolean reusable) throws IOException {
            byte[] region = entry.getKey().getEncodedRegionName();
            String regionName = Bytes.toString((byte[])region);
            WriterAndPath ret = (WriterAndPath)this.writers.get(regionName);
            if (ret != null) {
                return ret;
            }
            if (this.blacklistedRegions.contains(region)) {
                return null;
            }
            ret = this.createWAP(region, entry);
            if (ret == null) {
                this.blacklistedRegions.add(region);
                return null;
            }
            if (reusable) {
                this.writers.put(regionName, ret);
            }
            return ret;
        }

        WriterAndPath createWAP(byte[] region, WAL.Entry entry) throws IOException {
            Path regionedits = WALSplitter.getRegionSplitEditsPath(entry, WALSplitter.this.fileBeingSplit.getPath().getName(), WALSplitter.this.conf);
            if (regionedits == null) {
                return null;
            }
            FileSystem rootFs = FileSystem.get((Configuration)WALSplitter.this.conf);
            if (rootFs.exists(regionedits)) {
                LOG.warn("Found old edits file. It could be the result of a previous failed split attempt. Deleting " + regionedits + ", length=" + rootFs.getFileStatus(regionedits).getLen());
                if (!rootFs.delete(regionedits, false)) {
                    LOG.warn("Failed delete of old {}", (Object)regionedits);
                }
            }
            WALProvider.Writer w = WALSplitter.this.createWriter(regionedits);
            LOG.debug("Creating writer path={}", (Object)regionedits);
            return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
        }

        void filterCellByStore(WAL.Entry logEntry) {
            Map<byte[], Long> maxSeqIdInStores = WALSplitter.this.regionMaxSeqIdInStores.get(Bytes.toString((byte[])logEntry.getKey().getEncodedRegionName()));
            if (MapUtils.isEmpty(maxSeqIdInStores)) {
                return;
            }
            ArrayList<Cell> keptCells = new ArrayList<Cell>(logEntry.getEdit().getCells().size());
            for (Cell cell : logEntry.getEdit().getCells()) {
                if (CellUtil.matchingFamily((Cell)cell, (byte[])WALEdit.METAFAMILY)) {
                    keptCells.add(cell);
                    continue;
                }
                byte[] family = CellUtil.cloneFamily((Cell)cell);
                Long maxSeqId = maxSeqIdInStores.get(family);
                if (maxSeqId != null && maxSeqId >= logEntry.getKey().getSequenceId()) continue;
                keptCells.add(cell);
            }
            logEntry.getEdit().setCells(keptCells);
        }

        @Override
        public void append(RegionEntryBuffer buffer) throws IOException {
            this.appendBuffer(buffer, true);
        }

        WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException {
            List<WAL.Entry> entries = buffer.entryBuffer;
            if (entries.isEmpty()) {
                LOG.warn("got an empty buffer, skipping");
                return null;
            }
            SinkWriter wap = null;
            long startTime = System.nanoTime();
            try {
                int editsCount = 0;
                for (WAL.Entry logEntry : entries) {
                    if (wap == null && (wap = this.getWriterAndPath(logEntry, reusable)) == null) {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("getWriterAndPath decided we don't need to write edits for {}", (Object)logEntry);
                        }
                        return null;
                    }
                    this.filterCellByStore(logEntry);
                    if (!logEntry.getEdit().isEmpty()) {
                        ((WriterAndPath)wap).w.append(logEntry);
                        this.updateRegionMaximumEditLogSeqNum(logEntry);
                        ++editsCount;
                        continue;
                    }
                    wap.incrementSkippedEdits(1);
                }
                wap.incrementEdits(editsCount);
                wap.incrementNanoTime(System.nanoTime() - startTime);
            }
            catch (IOException e) {
                e = e instanceof RemoteException ? ((RemoteException)((Object)e)).unwrapRemoteException() : e;
                LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", (Throwable)e);
                throw e;
            }
            return wap;
        }

        @Override
        public boolean keepRegionEvent(WAL.Entry entry) {
            ArrayList<Cell> cells = entry.getEdit().getCells();
            for (Cell cell : cells) {
                if (!WALEdit.isCompactionMarker(cell)) continue;
                return true;
            }
            return false;
        }

        @Override
        public Map<byte[], Long> getOutputCounts() {
            TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
            for (Map.Entry entry : this.writers.entrySet()) {
                ret.put(Bytes.toBytes((String)((String)entry.getKey())), ((SinkWriter)entry.getValue()).editsWritten);
            }
            return ret;
        }

        @Override
        public int getNumberOfRecoveredRegions() {
            return this.writers.size();
        }
    }

    public static abstract class OutputSink {
        protected PipelineController controller;
        protected EntryBuffers entryBuffers;
        protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap();
        protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap();
        protected final List<WriterThread> writerThreads = Lists.newArrayList();
        protected final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(new TreeSet(Bytes.BYTES_COMPARATOR));
        protected boolean closeAndCleanCompleted = false;
        protected boolean writersClosed = false;
        protected final int numThreads;
        protected CancelableProgressable reporter = null;
        protected AtomicLong skippedEdits = new AtomicLong();
        protected List<Path> splits = null;

        public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
            this.numThreads = numWriters;
            this.controller = controller;
            this.entryBuffers = entryBuffers;
        }

        void setReporter(CancelableProgressable reporter) {
            this.reporter = reporter;
        }

        public synchronized void startWriterThreads() {
            for (int i = 0; i < this.numThreads; ++i) {
                WriterThread t = new WriterThread(this.controller, this.entryBuffers, this, i);
                t.start();
                this.writerThreads.add(t);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
            ConcurrentHashMap<String, Long> concurrentHashMap = this.regionMaximumEditLogSeqNum;
            synchronized (concurrentHashMap) {
                String regionName = Bytes.toString((byte[])entry.getKey().getEncodedRegionName());
                Long currentMaxSeqNum = this.regionMaximumEditLogSeqNum.get(regionName);
                if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
                    this.regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
                }
            }
        }

        int getNumOpenWriters() {
            return this.writers.size();
        }

        long getSkippedEdits() {
            return this.skippedEdits.get();
        }

        protected boolean finishWriting(boolean interrupt) throws IOException {
            LOG.debug("Waiting for split writer threads to finish");
            boolean progress_failed = false;
            for (WriterThread t : this.writerThreads) {
                t.finish();
            }
            if (interrupt) {
                for (WriterThread t : this.writerThreads) {
                    t.interrupt();
                }
            }
            for (WriterThread t : this.writerThreads) {
                if (!progress_failed && this.reporter != null && !this.reporter.progress()) {
                    progress_failed = true;
                }
                try {
                    t.join();
                }
                catch (InterruptedException ie) {
                    InterruptedIOException iie = new InterruptedIOException();
                    iie.initCause(ie);
                    throw iie;
                }
            }
            this.controller.checkForErrors();
            LOG.info("{} split writers finished; closing.", (Object)this.writerThreads.size());
            return !progress_failed;
        }

        public abstract List<Path> finishWritingAndClose() throws IOException;

        public abstract Map<byte[], Long> getOutputCounts();

        public abstract int getNumberOfRecoveredRegions();

        public abstract void append(RegionEntryBuffer var1) throws IOException;

        public boolean flush() throws IOException {
            return false;
        }

        public abstract boolean keepRegionEvent(WAL.Entry var1);
    }

    public static class WriterThread
    extends Thread {
        private volatile boolean shouldStop = false;
        private PipelineController controller;
        private EntryBuffers entryBuffers;
        private OutputSink outputSink = null;

        WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i) {
            super(Thread.currentThread().getName() + "-Writer-" + i);
            this.controller = controller;
            this.entryBuffers = entryBuffers;
            this.outputSink = sink;
        }

        @Override
        public void run() {
            try {
                this.doRun();
            }
            catch (Throwable t) {
                LOG.error("Exiting thread", t);
                this.controller.writerThreadError(t);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doRun() throws IOException {
            LOG.trace("Writer thread starting");
            while (true) {
                RegionEntryBuffer buffer;
                if ((buffer = this.entryBuffers.getChunkToWrite()) == null) {
                    Object object = this.controller.dataAvailable;
                    synchronized (object) {
                        block12: {
                            if (this.shouldStop && !this.outputSink.flush()) {
                                return;
                            }
                            try {
                                this.controller.dataAvailable.wait(500L);
                            }
                            catch (InterruptedException ie) {
                                if (this.shouldStop) break block12;
                                throw new RuntimeException(ie);
                            }
                        }
                    }
                }
                assert (buffer != null);
                try {
                    this.writeBuffer(buffer);
                    continue;
                }
                finally {
                    this.entryBuffers.doneWriting(buffer);
                    continue;
                }
                break;
            }
        }

        private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
            this.outputSink.append(buffer);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void finish() {
            Object object = this.controller.dataAvailable;
            synchronized (object) {
                this.shouldStop = true;
                this.controller.dataAvailable.notifyAll();
            }
        }
    }

    public static class RegionEntryBuffer
    implements HeapSize {
        long heapInBuffer = 0L;
        List<WAL.Entry> entryBuffer;
        TableName tableName;
        byte[] encodedRegionName;

        RegionEntryBuffer(TableName tableName, byte[] region) {
            this.tableName = tableName;
            this.encodedRegionName = region;
            this.entryBuffer = new ArrayList<WAL.Entry>();
        }

        long appendEntry(WAL.Entry entry) {
            this.internify(entry);
            this.entryBuffer.add(entry);
            long incrHeap = entry.getEdit().heapSize() + (long)ClassSize.align((int)(2 * ClassSize.REFERENCE)) + 0L;
            this.heapInBuffer += incrHeap;
            return incrHeap;
        }

        private void internify(WAL.Entry entry) {
            WALKeyImpl k = entry.getKey();
            k.internTableName(this.tableName);
            k.internEncodedRegionName(this.encodedRegionName);
        }

        public long heapSize() {
            return this.heapInBuffer;
        }

        public byte[] getEncodedRegionName() {
            return this.encodedRegionName;
        }

        public List<WAL.Entry> getEntryBuffer() {
            return this.entryBuffer;
        }

        public TableName getTableName() {
            return this.tableName;
        }
    }

    public static class EntryBuffers {
        PipelineController controller;
        Map<byte[], RegionEntryBuffer> buffers = new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
        Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
        long totalBuffered = 0L;
        long maxHeapUsage;
        boolean splitWriterCreationBounded;

        public EntryBuffers(PipelineController controller, long maxHeapUsage) {
            this(controller, maxHeapUsage, false);
        }

        public EntryBuffers(PipelineController controller, long maxHeapUsage, boolean splitWriterCreationBounded) {
            this.controller = controller;
            this.maxHeapUsage = maxHeapUsage;
            this.splitWriterCreationBounded = splitWriterCreationBounded;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
            long incrHeap;
            WALKeyImpl key = entry.getKey();
            Object object = this;
            synchronized (object) {
                RegionEntryBuffer buffer = this.buffers.get(key.getEncodedRegionName());
                if (buffer == null) {
                    buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
                    this.buffers.put(key.getEncodedRegionName(), buffer);
                }
                incrHeap = buffer.appendEntry(entry);
            }
            object = this.controller.dataAvailable;
            synchronized (object) {
                this.totalBuffered += incrHeap;
                while (this.totalBuffered > this.maxHeapUsage && this.controller.thrown.get() == null) {
                    LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", (Object)this.totalBuffered);
                    this.controller.dataAvailable.wait(2000L);
                }
                this.controller.dataAvailable.notifyAll();
            }
            this.controller.checkForErrors();
        }

        synchronized RegionEntryBuffer getChunkToWrite() {
            if (this.splitWriterCreationBounded && this.totalBuffered < this.maxHeapUsage) {
                return null;
            }
            long biggestSize = 0L;
            byte[] biggestBufferKey = null;
            for (Map.Entry<byte[], RegionEntryBuffer> entry : this.buffers.entrySet()) {
                long size = entry.getValue().heapSize();
                if (size <= biggestSize || this.currentlyWriting.contains(entry.getKey())) continue;
                biggestSize = size;
                biggestBufferKey = entry.getKey();
            }
            if (biggestBufferKey == null) {
                return null;
            }
            RegionEntryBuffer buffer = this.buffers.remove(biggestBufferKey);
            this.currentlyWriting.add(biggestBufferKey);
            return buffer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void doneWriting(RegionEntryBuffer buffer) {
            EntryBuffers entryBuffers = this;
            synchronized (entryBuffers) {
                boolean removed = this.currentlyWriting.remove(buffer.encodedRegionName);
                assert (removed);
            }
            long size = buffer.heapSize();
            Object object = this.controller.dataAvailable;
            synchronized (object) {
                this.totalBuffered -= size;
                this.controller.dataAvailable.notifyAll();
            }
        }

        synchronized boolean isRegionCurrentlyWriting(byte[] region) {
            return this.currentlyWriting.contains(region);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitUntilDrained() {
            Object object = this.controller.dataAvailable;
            synchronized (object) {
                while (this.totalBuffered > 0L) {
                    try {
                        this.controller.dataAvailable.wait(2000L);
                    }
                    catch (InterruptedException e) {
                        LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
                        Thread.interrupted();
                        break;
                    }
                }
            }
        }
    }

    public static class PipelineController {
        AtomicReference<Throwable> thrown = new AtomicReference();
        public final Object dataAvailable = new Object();

        void writerThreadError(Throwable t) {
            this.thrown.compareAndSet(null, t);
        }

        void checkForErrors() throws IOException {
            Throwable thrown = this.thrown.get();
            if (thrown == null) {
                return;
            }
            if (thrown instanceof IOException) {
                throw new IOException(thrown);
            }
            throw new RuntimeException(thrown);
        }
    }
}

