PhxRPC源码分析(一)BlockTcpStream

phxrpc简介

phxrpc是微信开源的轻量级rpc框架,支持协程模式,代码量不大,很适合拿来学习一个。这个框架用了流式操作,很有特点。之前总听人说iostream操作慢,没有研究过流操作底层的实现,今天就来分析一下。

定制输入输出流

phxrpc使用标准输入输出的方式封装了socket,使用<<>>来实现读写。
实现类图如下:
这里写图片描述

streambuf

streambuf是一个流缓冲区,包含输入和输出缓冲区。向ostream写入时,数据会放到输出缓冲区中,从istream读取时,数据从输入缓冲区中读出。
这里写图片描述

streambuf输出缓冲区

输出缓冲区由三个指针维护,pbase指向起始位置,epptr指向末尾(类似迭代器,前闭后开),pptr指向当前可放置的位置。向输出缓冲区写入数据的函数是sputcsputn,第一个写入一个字符并将pptr向后移动一位。第二个写入多个字符。当缓冲区满时,会调用overflow()函数将所有数据发送并清空缓冲区,调用pbump(0)重置pptr指针到起始位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int BaseTcpStreamBuf::sync() { //缓冲区已满,将数据发送
int sent = 0;
int total = pptr() - pbase();
while (sent < total) {
int ret = psend(pbase() + sent, total - sent, 0);
if (ret > 0) {
sent += ret;
} else {
//phxrpc::log(LOG_ERR, "sync ret %d errno %d,%s", ret, errno, strerror(errno));
return -1;
}
}

setp(pbase(), pbase() + buf_size_);
pbump(0);

return 0;
}
int BaseTcpStreamBuf::overflow(int c) {
if (-1 == sync()) {
return traits_type::eof();
} else { //数据已发送,放入c
if (!traits_type::eq_int_type(c, traits_type::eof())) {
sputc(traits_type::to_char_type(c));
}

return traits_type::not_eof(c);
}
}

streambuf输入缓冲区

输入缓冲区同样由三个指针维护,分别是ebackgptregptrgptr指向当前可读的位置,如果gptr()==egptr()说明数据已读完,此时会调用underflow()从socket中接受数据并重置指针。

1
2
3
4
5
6
7
8
9
int BaseTcpStreamBuf::underflow() {
int ret = precv(eback(), buf_size_, 0);
if (ret > 0) {
setg(eback(), eback(), eback() + ret); //重置指针
return traits_type::to_int_type(*gptr());
} else {
//phxrpc::log(LOG_ERR, "ret %d errno %d,%s", ret, errno, strerror(errno));
return traits_type::eof();
}

BlockTcpStream

这个类实现了一个阻塞TCP流,使用Attach()调用父类的NewRdBuf()绑定一个streambuf。网络IO交给BlockTcpStreamBuf来做。

BlockTcpUtils

BlockTcpUtils这个工具类封装了网络连接的操作。Open()由客户端调用,Listen()由服务端调用。

Open()函数中调用connect()之前将socketfd设为非阻塞。为了验证非阻塞connect是否成功,需要select()poll()一下看是否可写然后调用getsockopt()检查(man connect 中提到)。完成连接后再将sockfd设为阻塞。

# PhxRPC

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×