PhxRPC源码分析(三) RPC

RPC

最后这篇文章分析一下整体消息收发处理的逻辑。

整个RPC的定义基本都在hsha_server这个文件。主要有一下几个类:

  • DataFlow :数据流,所有请求和应答分别保存在两个线程安全的队列中。
  • HshaServerStat HshaServerQos: 统计运行状态,独立线程。
  • Worker :独立的工作线程,如果是协程模式,每个worker会有多个协程。
  • WorkerPool:工作池,管理Worker
  • HshaServerUnit:独立线程的工作单元,每个单元都有一个WorkerPool ,UThreadEpollSchedulerDataFlow
  • HshaServerIO:在HshaServerUnit线程处理IO事件。
  • HshaServer:server对象,有多个工作单元。
  • HshaServerAcceptor:接受连接,工作在主线程。

运行起来有一个accept线程,每个unit有一个IO线程,多个worker线程。

各个模块之间的关系如下

这里写图片描述

DataFlow

DataFlow包含request和response队列,并附加了时间戳和参数指针。

HshaServerStat HshaServerQos

独立线程负责统计运行信息,线程绑定为CallFunc函数,使用了设置超时时间的条件变量,超时时间为1s,这样如果没有通知则每秒统计一次。

Worker

独立工作线程,绑定为Worker::Func

1
2
3
4
5
6
7
void Worker::Func() {
if (uthread_count_ == 0) { //如果没有设置协程数量
ThreadMode(); //线程模式
} else {
UThreadMode(); //协程模式
}
}

线程模式直接从DataFlow中拉一个request然后执行WorkerLogic
协程模式创建一个调度器并设置处理新请求的函数。

1
2
3
4
5
6
void Worker::UThreadMode() {
worker_scheduler_ = new UThreadEpollScheduler(utherad_stack_size_, uthread_count_, true);
assert(worker_scheduler_ != nullptr);
worker_scheduler_->SetHandlerNewRequestFunc(bind(&Worker::HandlerNewRequestFunc, this));
worker_scheduler_->RunForever();
}

HandlerNewRequestFunc会将WorkerLogic的包装UThreadFunc加入调度器的任务队列。
WorkerLogic是真正处理逻辑的函数,对没有超时的请求,会分发到具体的函数处理,最后将结果push到response队列中。

WorkPool

WorkPool负责创建Worker

HshaServerIO

1
2
3
4
5
void HshaServerIO::RunForever() {
scheduler_->SetHandlerAcceptedFdFunc(bind(&HshaServerIO::HandlerAcceptedFd, this));
scheduler_->SetActiveSocketFunc(bind(&HshaServerIO::ActiveSocketFunc, this));
scheduler_->RunForever();
}

RunForever设置Run中执行的两个回调函数分别处理新建连接和写response

AddAcceptedFd函数负责将已连接的fd放入accepted_fd_list_中。

HandlerAcceptedFd函数从队列中取出已连接的fd,并绑定IOFunc函数加入协程调度器的任务队列。

IOFunc函数新建一个关联socketfd的UThreadTcpStream,然后判断请求协议的类型并解析请求,将解析完成的request push到DataFlow中,
然后调用

1
worker_pool_->Notify();

这一步针对UThreadMode,worker_pool的Notify函数会调用worker的Notify来通知worker的UThreadEpollScheduler,接下来IOFunc调用UThreadWait设置一个超时时间Yield出去。

此时worker的UThreadEpollSchedulerRun函数处轮询,接下来会执行开启UThreadMode时绑定的handler_new_request_func_也就是Worker::HandlerNewRequestFunc,这个函数将从DataFlow中拉取request然后将WorkerLogic加入worker_scheduler_的任务队列。这样开始执行WorkerLogic,执行完逻辑后WorkerLogic将response加入DataFlow的队列中。最后执行

1
pool_->scheduler_->NotifyEpoll();

这一步调用worker_pool_的UThreadEpollSchedulerNotifyEpoll,接下来流程回到pool_->scheduler_的Run中,此时执行active_socket_func_DataFlow中取出response,将其包装为该socket的args并返回,最后Resume到此socket的协程也就是之前的IOFunc,将response send出去。

这个应该是整个流程最复杂的一个函数了,主要是worker线程和workpool的UThreadEpollScheduler的互相唤醒。一次完整的收发数据流程如下:

这里写图片描述

HshaServerUnit

HshaServerUnit是IO线程,其RunFunc AddAcceptedFd调用的都是成员HshaServerIO的相应函数。

HshaServerAcceptor

HshaServerAcceptorLoopAccept函数负责accept新连接,idx_变量通过每次取余的方式来确定将accept的fd放到哪个HshaServerUnit

HshaServer

HshaServer获取配置文件中的IO线程数量创建相同数量的HshaServerUnit,根据配置文件的工作线程数量初始化其参数,然后push到server_unit_list_中。

# PhxRPC

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×