HBase队列

HRegionServer

public HRegionServer(Configuration conf, CoordinatedStateManager csm)
      throws IOException, InterruptedException {
      ...
      //初始化RpcService
      rpcServices = createRpcServices();
      ...
      rpcServices.start();
      ...
}

RSRpcServices

RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException {
    ...
    RpcSchedulerFactory rpcSchedulerFactory;
    try {
      Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
          REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
          SimpleRpcSchedulerFactory.class);
      rpcSchedulerFactory = (RpcSchedulerFactory)
          rpcSchedulerFactoryClass.getDeclaredConstructor().newInstance();
    } catch (Exception e) {
      throw new IllegalArgumentException(e);
    }
    ...
    //通过注解方式加载RegionServer的
    //openRegion,closeRegion,compactRegion,flushRegion,getOnlineRegion,getRegionInfo,
    //getServerInfo,getStoreFile,mergeRegions,replay,replicateWALEntry,splitRegion,stopServer
    //方法
    priority = createPriority();
    ...
    try {
      rpcServer = new RpcServer(rs, name, getServices(),
          bindAddress, // use final bindAddress for this server.
          rs.conf,
          //创建regionServer的调度器
          rpcSchedulerFactory.create(rs.conf, this, rs));
      rpcServer.setRsRpcServices(this);
    } catch (BindException be) {
      String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
          HConstants.REGIONSERVER_PORT;
      throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
          "' configuration property.", be.getCause() != null ? be.getCause() : be);
    }
}


  protected PriorityFunction createPriority() {
    return new AnnotationReadingPriorityFunction(this);
  }

AnnotationReadingPriorityFunction

  public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
      Class<? extends RSRpcServices> clz) {
    Map<String,Integer> qosMap = new HashMap<String,Integer>();
    //遍历并加载所有带 QosPriority 注解的方法
    for (Method m : clz.getMethods()) {
      QosPriority p = m.getAnnotation(QosPriority.class);
      if (p != null) {
        // Since we protobuf'd, and then subsequently, when we went with pb style, method names
        // are capitalized.  This meant that this brittle compare of method names gotten by
        // reflection no longer matched the method names coming in over pb.  TODO: Get rid of this
        // check.  For now, workaround is to capitalize the names we got from reflection so they
        // have chance of matching the pb ones.
        String capitalizedMethodName = capitalize(m.getName());
        qosMap.put(capitalizedMethodName, p.priority());
      }
    }
    this.rpcServices = rpcServices;
    this.annotatedQos = qosMap;
    if (methodMap.get("getRegion") == null) {
      methodMap.put("hasRegion", new HashMap<Class<? extends Message>, Method>());
      methodMap.put("getRegion", new HashMap<Class<? extends Message>, Method>());
    }
    for (Class<? extends Message> cls : knownArgumentClasses) {
      argumentToClassMap.put(cls.getName(), cls);
      try {
        methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
        methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }

    Configuration conf = rpcServices.getConfiguration();
    scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
  }

SimpleRpcSchedulerFactory

  public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
    //hbase.regionserver.handler.count
    //等待响应用户表级请求的线程数
    //默认值 30
    int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
    return new SimpleRpcScheduler(
      conf,
      handlerCount,
      //hbase.regionserver.metahandler.count
      //处理meta表请求的线程数
      //默认值 20
      conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
        HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
      //hbase.regionserver.replication.handler.count
      //拷贝队列的大小
      //默认值 3
      conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
          HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
      priority,
      server,
      HConstants.QOS_THRESHOLD);
  }

RpcServer

public RpcServer(final Server server, final String name,
      final List<BlockingServiceAndInterface> services,
      final InetSocketAddress bindAddress, Configuration conf,
      RpcScheduler scheduler){
      this.scheduler = scheduler;      
}
 protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
    ...
    if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
            callQueueSize.add(-1 * call.getSize());
    
            ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
            metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
            setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
                "Call queue is full on " + server.getServerName() +
                    ", too many items queued ?");
            responder.doRespond(call);
          }
 }

SimpleRpcScheduler

 public SimpleRpcScheduler(
      Configuration conf,
      int handlerCount,
      int priorityHandlerCount,
      int replicationHandlerCount,
      PriorityFunction priority,
      Abortable server,
      int highPriorityLevel){
      ...
      //默认FIFO队列
          String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
            RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
          float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
      
          if (callqReadShare > 0) {
            // at least 1 read handler and 1 write handler
            callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
              maxQueueLength, priority, conf, server);
          } else {
          //callQeueType有fifo,deadline,codel三种
          //fifo和codel使用 FastPathBalancedQueueRpcExecutor
          //deadline使用 BalancedQueueRpcExecutor
            if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
              callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
                  maxQueueLength, priority, conf, server);
            } else {
              callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength,
                  priority, conf, server);
            }
          }
      
          // Create 2 queues to help priorityExecutor be more scalable.
          this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
              "priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
              maxPriorityQueueLength, priority, conf, abortable) : null;
          this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
              "replication.FPBQ", replicationHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
              maxQueueLength, priority, conf, abortable) : null;
      
}
 public boolean dispatch(CallRunner callTask) throws InterruptedException {
    RpcServer.Call call = callTask.getCall();
    int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser());
    if (priorityExecutor != null && level > highPriorityLevel) {
      return priorityExecutor.dispatch(callTask);
    } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
      return replicationExecutor.dispatch(callTask);
    } else {
      return callExecutor.dispatch(callTask);
    }
  }

RpcExecutor

public RpcExecutor(final String name, final int handlerCount, final String callQueueType,
      final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
      final Abortable abortable) {
    if (isDeadlineQueueType(callQueueType)) {
      this.name += ".Deadline";
      this.queueInitArgs = new Object[] { maxQueueLength,
        new CallPriorityComparator(conf, this.priority) };
      this.queueClass = BoundedPriorityBlockingQueue.class;
    } else if (isCodelQueueType(callQueueType)) {
      this.name += ".Codel";
      int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
        CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
      int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
      double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
        CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
      queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
          codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
      queueClass = AdaptiveLifoCoDelCallQueue.class;
    } else {
      this.name += ".Fifo";
      queueInitArgs = new Object[] { maxQueueLength };
      queueClass = LinkedBlockingQueue.class;
    }

    LOG.info("RpcExecutor " + " name " + " using " + callQueueType
        + " as call queue; numCallQueues=" + numCallQueues + "; maxQueueLength=" + maxQueueLength
        + "; handlerCount=" + handlerCount);      
}

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦