基于ASIO实现C++网络库
/ 5 min read
Table of Contents
项目里的网络库用了asio,看了asio的文档后感觉用asio挺好用的,很容易实现一个网络库,而且有单独的版本可以不依赖boost,我们的目标是实现一个多线程,每个线程都有一个事件循环的异步网络库。
功能划分
整个网络库划分为Acceptor,Loop,LoopMgr,Session,TcpServer这几个类。 Loop使用io_context来作为eventloop使用,post函数用来向loop中投递任务,可以看做一个任务队列,线程安全,调用run函数后可以处理各种异步事件和任务。把一些跨线程的操作投递到一个loop中执行就避免了加锁。 LoopMgr相当于一个Loop线程池。 Acceptor持有asio的acceptor用来接收新连接, Session持有socket,使用async系列函数进行异步读写操作。 TcpServer管理连接。使用的时候通过注册TcpServer的onconnect,onmessage,ondisconnect回调函数完成对应事件的逻辑处理。 基本使用方法:
TcpServer server;server.setNewSessionCallback<UserSession>();server.setConnectionCallback();server.setMessageCallback();server.setServerDisconnectCallback();server.run();Acceptor
Acceptor类直接封装asio的acceptor
void Acceptor::accept() {    auto session = newSessionCallback_();    acceptor_.async_accept(session->Socket(), [session, this](const asio::error_code &err) {        if (!err) {            session->start();        } else {            session->close();        }        if(!acceptor_.is_open()){            return ;        }        accept();    });}asio的异步函数都是这种用法,需要在回调函数中重新调用。Accepter需要设置好newSessionCallback回调用来创建新的session
Session
Session封装了socket的读写操作,在start函数中调用设置的connectioncallback之后开始read操作
void Session::read() {    auto self(shared_from_this());    socket_.async_read_some(asio::buffer(read_buf_.beginWrite(), read_buf_.writableBytes()), [self, this](const asio::error_code &err, size_t size) {        if (!err) {            //LOG_INFO("receive data,size:{}",size);            read_buf_.hasWritten(size);            messagecallback_(self, &read_buf_);            read_buf_.ensureWritableBytes(size);            read();        } else {            LOG_ERROR("read error: {}", err.message());            if(reconnect_){                reconnect();            }else{                disconnectcallback_(id_);                close();            }            return;        }    });}在回调中调用负责解包的messagecallback,这里把解包操作完全交个用户设置的回调,用户需要在回调中操作buffer,读出数据后调用retrive移动读指针,一般写法是再加一层codec负责解包后再处理消息。
void Session::send(BufferPtr buffer) {    {        std::lock_guard<std::mutex> guard(mutex_);        unsend_queue_.push_back(buffer);        // TODO(shenyu): race condition?        if (write_buf_.readableBytes() != 0) return;    }
    auto self(shared_from_this());    loop_->runInLoop([this, self]() {        bool write_in_progress = (write_buf_.readableBytes() != 0);        if (!write_in_progress) {            write();        }    });}
void Session::write() {    {        std::lock_guard<std::mutex> guard(mutex_);        tmp_queue_.swap(unsend_queue_);        unsend_queue_.clear();        if (tmp_queue_.empty()) return;    }
    for (const auto &buf : tmp_queue_) {        write_buf_.append(buf->peek(), buf->readableBytes());    }
    auto self(shared_from_this());    socket_.async_write_some(asio::buffer(write_buf_.peek(), write_buf_.readableBytes()), [self, this](const asio::error_code &err, size_t size) {        if (!err) {            write_buf_.retrieve(write_buf_.readableBytes());            {                std::lock_guard<std::mutex> guard(this->mutex_);                if (this->unsend_queue_.empty()) return;            }            write();        } else {            disconnectcallback_(id_);            LOG_ERROR("write error: {}", err.message());            close();            return;        }    });}写操作稍微复杂一些,不能直接调用异步写操作,如果连续两次调用write函数,而第一个发的包比较大,第二次的包比较小,可能第一个包没有发完第二个就发出去了,保证不了顺序。正确的做法是增加一个待发送队列,send时统一push到未发送队列,并判断发送buf是否为空,空说明当前没有在发消息,调用write,write时将未发送队列的内容拷贝到发送buf中一起发送。
TcpServer
用户直接使用TcpServer类,只需实现事件的回调函数即可。
template <typename T>SessionPtr TcpServer::newSession() {    auto loop = loopmgr_.findNextLoop();    auto session = std::make_shared<T>(loop, sessionid_++);    session->setMessageCallback(messagecallback_);    session->setConnectionCallback(connectioncallback_);    session->setDisconnectCallback(std::bind(&TcpServer::DefaultDisconnectCallback, this, _1));    {        std::lock_guard<std::mutex> guard(mutex_);        connections_.insert({sessionid_, session});    }    return session;}这里的newSession使用模板函数用来创建Session不同的子类,直接使用轮询将session派发到不同的eventloop中。
直接使用asio比用epoll还是方便不少的,而且跨平台,但是asio的代码可读性太差,不知道会不会有什么坑,性能比封装epoll肯定也要差些。