/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALEditsReplay;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;

@InterfaceAudience.Private
public class WALEditsReplaySink {
    private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
    private static final int MAX_BATCH_SIZE = 3000;
    private final Configuration conf;
    private final HConnection conn;
    private final TableName tableName;
    private final MetricsWALEditsReplay metrics;
    private final AtomicLong totalReplayedEdits = new AtomicLong();
    private final boolean skipErrors;
    private final int replayTimeout;

    public WALEditsReplaySink(Configuration conf, TableName tableName, HConnection conn) throws IOException {
        this.conf = conf;
        this.metrics = new MetricsWALEditsReplay();
        this.conn = conn;
        this.tableName = tableName;
        this.skipErrors = conf.getBoolean("hbase.hregion.edits.replay.skip.errors", false);
        this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
    }

    public void replayEntries(List<Pair<HRegionLocation, Row>> actions) throws IOException {
        if (actions.size() == 0) {
            return;
        }
        int batchSize = actions.size();
        int dataSize = 0;
        HashMap actionsByRegion = new HashMap();
        HRegionLocation loc = null;
        Row row = null;
        List<Action> regionActions = null;
        for (int i = 0; i < batchSize; ++i) {
            loc = (HRegionLocation)actions.get(i).getFirst();
            row = (Row)actions.get(i).getSecond();
            if (actionsByRegion.containsKey(loc.getRegionInfo())) {
                regionActions = (List)actionsByRegion.get(loc.getRegionInfo());
            } else {
                regionActions = new ArrayList();
                actionsByRegion.put(loc.getRegionInfo(), regionActions);
            }
            Action action = new Action(row, i);
            regionActions.add(action);
            dataSize += row.getRow().length;
        }
        long startTime = EnvironmentEdgeManager.currentTimeMillis();
        for (HRegionInfo curRegion : actionsByRegion.keySet()) {
            List allActions = (List)actionsByRegion.get(curRegion);
            int totalActions = allActions.size();
            int curBatchSize = 0;
            for (int replayedActions = 0; replayedActions < totalActions; replayedActions += curBatchSize) {
                curBatchSize = totalActions > 3000 + replayedActions ? 3000 : totalActions - replayedActions;
                this.replayEdits(loc, curRegion, allActions.subList(replayedActions, replayedActions + curBatchSize));
            }
        }
        long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
        LOG.debug((Object)("number of rows:" + actions.size() + " are sent by batch! spent " + endTime + "(ms)!"));
        this.metrics.updateReplayTime(endTime);
        this.metrics.updateReplayBatchSize(batchSize);
        this.metrics.updateReplayDataSize(dataSize);
        this.totalReplayedEdits.addAndGet(batchSize);
    }

    public String getStats() {
        return this.totalReplayedEdits.get() == 0L ? "" : "Sink: total replayed edits: " + this.totalReplayedEdits;
    }

    private void replayEdits(HRegionLocation regionLoc, HRegionInfo regionInfo, List<Action<Row>> actions) throws IOException {
        try {
            RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate((Configuration)this.conf);
            ReplayServerCallable callable = new ReplayServerCallable(this.conn, this.tableName, regionLoc, regionInfo, actions);
            factory.newCaller().callWithRetries(callable, this.replayTimeout);
        }
        catch (IOException ie) {
            if (this.skipErrors) {
                LOG.warn((Object)("hbase.hregion.edits.replay.skip.errors=true so continuing replayEdits with error:" + ie.getMessage()));
            }
            throw ie;
        }
    }

    class ReplayServerCallable<R>
    extends RegionServerCallable<ClientProtos.MultiResponse> {
        private HRegionInfo regionInfo;
        private List<Action<Row>> actions;

        ReplayServerCallable(HConnection connection, TableName tableName, HRegionLocation regionLoc, HRegionInfo regionInfo, List<Action<Row>> actions) {
            super(connection, tableName, null);
            this.actions = actions;
            this.regionInfo = regionInfo;
            this.setLocation(regionLoc);
        }

        public ClientProtos.MultiResponse call() throws IOException {
            try {
                this.replayToServer(this.regionInfo, this.actions);
            }
            catch (ServiceException se) {
                throw ProtobufUtil.getRemoteException((ServiceException)se);
            }
            return null;
        }

        private void replayToServer(HRegionInfo regionInfo, List<Action<Row>> actions) throws IOException, ServiceException {
            AdminProtos.AdminService.BlockingInterface remoteSvr = WALEditsReplaySink.this.conn.getAdmin(this.getLocation().getServerName());
            ClientProtos.MultiRequest request = RequestConverter.buildMultiRequest((byte[])regionInfo.getRegionName(), actions);
            ClientProtos.MultiResponse protoResults = remoteSvr.replay(null, request);
            List resultList = protoResults.getResultList();
            int n = resultList.size();
            for (int i = 0; i < n; ++i) {
                ClientProtos.ActionResult result = (ClientProtos.ActionResult)resultList.get(i);
                if (!result.hasException()) continue;
                Throwable t = ProtobufUtil.toException((HBaseProtos.NameBytesPair)result.getException());
                if (!WALEditsReplaySink.this.skipErrors) {
                    IOException ie = new IOException();
                    ie.initCause(t);
                    throw ie;
                }
                LOG.warn((Object)("hbase.hregion.edits.replay.skip.errors=true so continuing replayToServer with error:" + t.getMessage()));
                return;
            }
        }

        public void prepare(boolean reload) throws IOException {
            block1: {
                if (!reload) {
                    return;
                }
                Iterator<Action<Row>> i$ = this.actions.iterator();
                if (!i$.hasNext()) break block1;
                Action<Row> action = i$.next();
                this.setLocation(WALEditsReplaySink.this.conn.locateRegion(WALEditsReplaySink.this.tableName, action.getAction().getRow()));
            }
        }
    }
}

