/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import com.google.common.base.Throwables;
import com.google.common.collect.UnmodifiableIterator;
import com.ning.compress.lzf.LZFInputStream;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Collection;
import java.util.UUID;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionColumns;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamHook;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamReader {
    private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
    protected final UUID cfId;
    protected final long estimatedKeys;
    protected final Collection<Pair<Long, Long>> sections;
    protected final StreamSession session;
    protected final Version inputVersion;
    protected final long repairedAt;
    protected final SSTableFormat.Type format;
    protected final int sstableLevel;
    protected final SerializationHeader.Component header;
    protected final int fileSeqNum;

    public StreamReader(FileMessageHeader header, StreamSession session) {
        this.session = session;
        this.cfId = header.cfId;
        this.estimatedKeys = header.estimatedKeys;
        this.sections = header.sections;
        this.inputVersion = header.version;
        this.repairedAt = header.repairedAt;
        this.format = header.format;
        this.sstableLevel = header.sstableLevel;
        this.header = header.header;
        this.fileSeqNum = header.sequenceNumber;
    }

    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException {
        long totalSize = this.totalSize();
        Pair<String, String> kscf = Schema.instance.getCF(this.cfId);
        ColumnFamilyStore cfs = null;
        if (kscf != null) {
            cfs = Keyspace.open((String)kscf.left).getColumnFamilyStore((String)kscf.right);
        }
        if (kscf == null || cfs == null) {
            throw new IOException("CF " + this.cfId + " was dropped during streaming");
        }
        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.", new Object[]{this.session.planId(), this.fileSeqNum, this.session.peer, this.repairedAt, totalSize, cfs.keyspace.getName(), cfs.getColumnFamilyName()});
        DataInputStream dis = new DataInputStream((InputStream)new LZFInputStream(Channels.newInputStream(channel)));
        BytesReadTracker in = new BytesReadTracker(dis);
        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, this.inputVersion, this.header.toHeader(cfs.metadata));
        SSTableMultiWriter writer = null;
        try {
            writer = this.createWriter(cfs, totalSize, this.repairedAt, this.format);
            while (in.getBytesRead() < totalSize) {
                this.writePartition(deserializer, writer);
                this.session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
            }
            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", new Object[]{this.session.planId(), this.fileSeqNum, this.session.peer, in.getBytesRead(), totalSize});
            return writer;
        }
        catch (Throwable e) {
            if (deserializer != null) {
                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", new Object[]{this.session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName()});
            }
            if (writer != null) {
                writer.abort(e);
            }
            this.drain(dis, in.getBytesRead());
            if (e instanceof IOException) {
                throw (IOException)e;
            }
            throw Throwables.propagate((Throwable)e);
        }
    }

    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException {
        Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
        if (localDir == null) {
            throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
        }
        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, this.estimatedKeys, repairedAt, format, this.sstableLevel, totalSize, this.session.getTransaction(this.cfId), this.header);
        StreamHook.instance.reportIncomingFile(cfs, writer, this.session, this.fileSeqNum);
        return writer;
    }

    protected void drain(InputStream dis, long bytesRead) throws IOException {
        long toSkip = this.totalSize() - bytesRead;
        long skipped = dis.skip(toSkip);
        if (skipped == -1L) {
            return;
        }
        toSkip -= skipped;
        while (toSkip > 0L && (skipped = dis.skip(toSkip)) != -1L) {
            toSkip -= skipped;
        }
    }

    protected long totalSize() {
        long size = 0L;
        for (Pair<Long, Long> section : this.sections) {
            size += (Long)section.right - (Long)section.left;
        }
        return size;
    }

    protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer) throws IOException {
        writer.append(deserializer.newPartition());
        deserializer.checkForExceptions();
    }

    public static class StreamDeserializer
    extends UnmodifiableIterator<Unfiltered>
    implements UnfilteredRowIterator {
        private final CFMetaData metadata;
        private final DataInputPlus in;
        private final SerializationHeader header;
        private final SerializationHelper helper;
        private DecoratedKey key;
        private DeletionTime partitionLevelDeletion;
        private SSTableSimpleIterator iterator;
        private Row staticRow;
        private IOException exception;

        public StreamDeserializer(CFMetaData metadata, DataInputPlus in, Version version, SerializationHeader header) {
            assert (version.storeRows()) : "We don't allow streaming from pre-3.0 nodes";
            this.metadata = metadata;
            this.in = in;
            this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE);
            this.header = header;
        }

        public StreamDeserializer newPartition() throws IOException {
            this.key = this.metadata.decorateKey(ByteBufferUtil.readWithShortLength(this.in));
            this.partitionLevelDeletion = DeletionTime.serializer.deserialize(this.in);
            this.iterator = SSTableSimpleIterator.create(this.metadata, this.in, this.header, this.helper, this.partitionLevelDeletion);
            this.staticRow = this.iterator.readStaticRow();
            return this;
        }

        @Override
        public CFMetaData metadata() {
            return this.metadata;
        }

        @Override
        public PartitionColumns columns() {
            return this.metadata.partitionColumns();
        }

        @Override
        public boolean isReverseOrder() {
            return false;
        }

        @Override
        public DecoratedKey partitionKey() {
            return this.key;
        }

        @Override
        public DeletionTime partitionLevelDeletion() {
            return this.partitionLevelDeletion;
        }

        @Override
        public Row staticRow() {
            return this.staticRow;
        }

        @Override
        public EncodingStats stats() {
            return this.header.stats();
        }

        @Override
        public boolean hasNext() {
            try {
                return this.iterator.hasNext();
            }
            catch (IOError e) {
                if (e.getCause() != null && e.getCause() instanceof IOException) {
                    this.exception = (IOException)e.getCause();
                    return false;
                }
                throw e;
            }
        }

        @Override
        public Unfiltered next() {
            Unfiltered unfiltered = (Unfiltered)this.iterator.next();
            return this.metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW ? this.maybeMarkLocalToBeCleared((Row)unfiltered) : unfiltered;
        }

        private Row maybeMarkLocalToBeCleared(Row row) {
            return this.metadata.isCounter() ? row.markCounterLocalToBeCleared() : row;
        }

        public void checkForExceptions() throws IOException {
            if (this.exception != null) {
                throw this.exception;
            }
        }

        @Override
        public void close() {
        }
    }
}

