2分六合

IT技术互动交流平台

HBase1.0.0的RPC机制分析与源码解读(一)

来源:IT165收集  发布日期:2015-03-30 21:52:25
HBase的RPC机制,除了使用protocal buf的工具之外都是利用java的原生API进行构造,RPC机制的解读包括客户端和服务器端两个部分,本文主要就服务器端的服务运行机制以及工作流程进行简要分析.
首先我们来分析一下一个典型的RPC服务器短的处理流程,如下图所示:

从图中可以清楚的看到当客户端向服务器端发送一个请求的时候,最开始是被RPCServer中的Listener所监听到的,如下面的代码所示,HBase的RpcServer启动的时候会启动几个处理线程:

    responder.start();
    listener.start();
    scheduler.start();
其中,Responder线程负责数据的request的回复工作,listener负责监听客户端的请求,scheduler负责具体call的调度工作
      while (running) {
        SelectionKey key = null;
        try {
          selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);
              }
            } catch (IOException ignored) {
              if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
            }
            key = null;
          }
        }
listener的实现过程中运用了java nio的一些特性,主要是每个Listener线程又会管理这一个Reader的线程池,这些Reader具体负责从Socket Channel中读取数据,并解析数据中的相关项,进而构造出可运行的CallRunner:
      Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
              totalRequestSize,
              traceInfo);
      scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
RpcScheduler调用对应的RpcExcutor进行相应的处理,RpcExcutor中启动了多个处理线程,这些线程从队列中取出任务并且执行,
  protected void startHandlers(final String nameSuffix, final int numHandlers,
      final List<BlockingQueue<CallRunner>> callQueues,
      final int qindex, final int qsize, final int port) {
    final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
    for (int i = 0; i < numHandlers; i++) {
      final int index = qindex + (i % qsize);
      Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
          consumerLoop(callQueues.get(index));
        }
      });
      t.setDaemon(true);
      t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
        ",queue=" + index + ",port=" + port);
      t.start();
      LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
      handlers.add(t);
    }
  }
  protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
    boolean interrupted = false;
    double handlerFailureThreshhold =
        conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
          HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
    try {
      while (running) {
        try {
          CallRunner task = myQueue.take();
          try {
            activeHandlerCount.incrementAndGet();
            task.run();

其实继续跟进代码就会发现CallRunner的run方法最主要动作有两个一个是获取操作结果一个是返回操作结果给客户:
       resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
          call.timestamp, this.status);
  if (!call.isDelayed() || !call.isReturnValueDelayed()) {
        Message param = resultPair != null ? resultPair.getFirst() : null;
        CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
        call.setResponse(param, cells, errorThrowable, error);
      }
      call.sendResponseIfReady();
到此结束就简要介绍了一个完整的服务端RPC处理流程,该流程中涉及到的相关的类的关系如下图所示:


Tag标签:      
  • 专题推荐

About IT165 - 广告服务 - 隐私声明 - 版权申明 - 免责条款 - 网站地图 - 网友投稿 - 联系方式
本站内容来自于互联网,仅供用于网络技术学习,学习中请遵循相关法律法规