HBase源码系列(一)客户端入口HTable

时间:2020-12-03 08:37:30

HTable

HTable作为客户端操作HBase数据的入口,是我们最常见的一个类。

当向HBase 写入数据时, 都发生了写什么呢?

Put操作

获取RegionLocations

HTable中

 public void put(final List<Put> puts) throws IOException {
        // 添加Put到buffer,如果buffer已经足够长,则提交到集群
        getBufferedMutator().mutate(puts);

        if (autoFlush) {
            // 如果BufferedMutatorImpl不为null,则flush所有的写操作
            flushCommits();
        }
  }

BufferedMutatorImpl中

try {
    ...
    ap.submit(tableName, writeAsyncBuffer, true, null, false);
    ...
} finally {
    currentWriteBufferSize = 0;
    for (Row mut : writeAsyncBuffer) {
        if (mut instanceof Mutation) {
            currentWriteBufferSize += ((Mutation) mut).heapSize();
        }
    }
}

AsyncProcess中获取region信息

/** *找到每条记录所在的region */
while (it.hasNext()) {
    Row r = it.next();
    ...
        RegionLocations locs = connection
                .locateRegion(tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);

    if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
        Action<Row> action = new Action<Row>(r, ++posInList);
        setNonce(ng, r, action);
        retainedActions.add(action);
        // TODO: replica-get is not supported on this path
        byte[] regionName = loc.getRegionInfo().getRegionName();
        addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
        it.remove();
    }
}

发送异步请求:

AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(tableName, retainedActions, nonceGroup, pool,
                callback, results, needResults);
ars.sendMultiAction(actionsByServer, 1, null, false);

根据MultiAction,封装多线程rpc任务:

run(){
    MultiResponse res;
    MultiServerCallable<Row> callable = null;

    callable = createCallable(server, tableName, multiAction);
    RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
    if (callsInProgress != null)
        callsInProgress.add(callable);
    // 远程RPC调用入口
    res = caller.callWithoutRetries(callable, timeout);
    if (res == null) {
        // Cancelled
        return;
    }
    receiveMultiAction(multiAction, server, res, numAttempt);
}

调用MultiServerCallable的call()方法,发送rpc请求:

try { responseProto = getStub().multi(controller, requestProto); } catch (ServiceException e) { throw ProtobufUtil.getRemoteException(e); }

总结

(1)把put操作添加到BufferedMutatorImpl的writeAsyncBuffer列表中,符合条件(自动flush或者超过了阀值writeBufferSize)就通过AsyncProcess异步批量提交。
(2)在提交之前,我们要根据每个rowkey找到它们归属的region server,这个定位的过程是通过HConnection的locateRegion方法获得的,然后再把这些rowkey按照HRegionLocation分组。
(3)通过多线程,一个HRegionLocation构造MultiServerCallable,然后通过rpcCallerFactory. newCaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束。

设置HTable参数 -> HBase写入性能优化

设置HTable参数
1、通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。默认情况下auto flush是开启的。
2、通过调用HTable.setWriteBufferSize(writeBufferSize)方法可以设置HTable客户端的写buffer大小,如果新设置的buffer小于当前写buffer中的数据时,buffer将会被flush到服务端。其中,writeBufferSize的单位是byte字节数,可以根据实际写入数据量的多少来设置该值。,()一般说是设置成5MB?)
3、在HBae中,客户端向集群中的RegionServer提交数据时(Put/Delete操作),首先会先写WAL(Write Ahead Log)日志(即HLog,一个RegionServer上的所有Region共享一个HLog),只有当WAL日志写成功后,再接着写MemStore,然后客户端被通知提交数据成功;如果写WAL日志失败,客户端则被通知提交失败。这样做的好处是可以做到RegionServer宕机后的数据恢复。
因此,对于相对不太重要的数据,可以在Put/Delete操作时,通过调用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函数,放弃写WAL日志,从而提高数据写入的性能。

经验之谈:经过实践,就第二条关闭日志的效果比较明显,其它的效果都不明显,因为提交的过程是异步的,所以提交的时候占用的时间并不多,提交到server端后,server还有一个写入的队列。所以大规模写入数据,别指望着用put来解决。。。mapreduce生成hfile,然后用bulk load的方式比较好。

Delete

delete比put要简单很多,直接发送rpc请求

MutateResponse response = getStub().mutate(controller, request);

Get

get在客户端向服务端请求,就简单很多,直接在get()方法进行发送rpc请求:

ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null)
    return null;
return ProtobufUtil.toResult(response.getResult());

立马杀到RSRpcServices中看看get()方法

checkOpen();
// 记录请求RegionServer的次数
requestCount.increment();
// 先找出在哪个region
Region region = getRegion(request.getRegion());
...
if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
           ...
} else {
    Get clientGet = ProtobufUtil.toGet(get);
    if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
      existence = region.getCoprocessorHost().preExists(clientGet);
    }
    if (existence == null) {
      // 从region中取数据
      r = region.get(clientGet);
      if (get.getExistenceOnly()) {
          boolean exists = r.getExists();
          if (region.getCoprocessorHost() != null) {
              exists = region.getCoprocessorHost().postExists(clientGet, exists);
          }
          existence = exists;
      }
  }
}

Scan

Scan 查询的模板代码

HTable hTable = new HTable(configuration, tableName);
    Scan scan = new Scan();
    // scan.setBatch(1000);
    scan.setCaching(10000);
    scan.setCacheBlocks(false);
    ResultScanner resultScanner = hTable.getScanner(scan);
    Result result;
    while ((result = resultScanner.next()) != null) {
        ...
    }
    resultScanner.close();
    hTable.close();

==========客户端的优化查询
进去

public ResultScanner getScanner(final Scan scan) throws IOException {
    // 是否逆向scan
    if (scan.isReversed()) {
        ...
    }
    // scan 有大有小
    if (scan.isSmall()) {
      return new ClientSmallScanner(getConfiguration(), scan, getName(),
          this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
          pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
    } else {
      return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
          this.rpcCallerFactory, this.rpcControllerFactory,
          pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
    }
}

ClientSmallScanner 实际上是 ClientScanner 的子类,调用nextScanner() 方法。
调用ScannerCallableWithReplicas类的call()方法

      // ScannerCallableWithReplicas的prepare()方法啥事情都没做
      callable.prepare(false);
      return callable.call(callTimeout);

向服务端发送scan请求

ScanResponse response = null;
controller = controllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
response = getStub().scan(controller, request);

服务端==============
RSRpcServices的scan方法

while (i < rows) {
    scannerContext.setBatchProgress(0);
    // Collect values to be returned here
    moreRows = scanner.nextRaw(values, scannerContext);
    if (!values.isEmpty()) {
      for (Cell cell : values) {
        totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
      }
      final boolean partial = scannerContext.partialResultFormed();
      // 加入 List<Result> 结果集中
      results.add(Result.create(values, null, stale, partial));
      i++;
    }
    ...
}