关于网络模块,我本来是不准备自己写的,直接找个现成的即可。但是我找了libevent
,libuv
,evpp
,muduo
,Boost.Asio
发现代码都太复杂了,看的头疼。最满意的是muduo,但是muduo本身不支持MacOS平台,另外代码较早,对c++11支持有限。
于是,我准备以muduo为原型,自己来实现网络库。采用Reactor模式,非阻塞 IO 模型,基于事件驱动和回调,原生支持多核多线程.支持linux和MacOS系统。
muduo因为出的早,用了boost的一些库来实现很多c++11的一些功能,实际上这些都是没有必要的,比如std::function
,我们完全可以用lambda表达式来实现。另外muduo不支持macos,这一点也是我需要兼容的,毕竟我的日常开发都是在macos下。
整体架构如下图,one loop per thread模式
整个系统至少需要一个线程(main-thread), 每个thread绑定了一个loop。 listenfd 运行在
main thread
,clientfd 根据算法分配至其他 loop(如果整个系统只有一个main thread,那么都会分配到main thread )。在Reactor的模式中,基本结构是事件循环,以事件驱动和事件回调的方式实现业务逻辑。其中的事件可以包含读写socket、连接的建立都可以采用非阻塞的方式进行.这些事件的监控在Linux下用的epoll,在MacOS则是kqueue,整个系统通过poll方法向epoll或kqueue获取有事件的fd列表,然后根据对应的event生成cb,将cb放到对应的thread去执行。需要注意的是,poll因为是阻塞的,所以如果这个时候没有外部的event,则会一直卡在poll处直到event到来或达到指定的timeout,显然这个时候执行队列的cb根本不会立即执行。所以我们还有一个wake机制,即像指定的fd里write一个8字节的空数据,以此让poll立即返回,执行后面的cb队列。关于wakeup机制,可以参考这里socketpair。注意,在poll返回的event里,如果识别出是wakeup发送的,则需要continue。wakeup对应的代码大约如下:
void EventLoop::wakeup() { uint64_t one = 1; sockets::write(wakeupFd_, &one, sizeof one); }
每个loop有一个执行队列
std::vector<std::function<void()>> _pendingFunctors
,需注意的是,每个cb都需要在本地thread执行。如果不在本地,则需要将其添加到对应的thread的队列里,大约代码如下// typedef std::function<void()> Functor; void EventLoop::runInLoop(const Functor &cb) { if (IsInLoopThread()) { cb(); } else { queueInLoop(cb); } } void EventLoop::queueInLoop(const angemon::Functor &cb) { { std::lock_guard<std::mutex> lock(_mutex); _pendingFunctors.push_back(cb); } if (!IsInLoopThread() || _functorPending) { Wakeup(); } }
doPendingFunctors的执行过程非常简单,此处就不多介绍了。大约代码如下:
void EventLoop::doPendingFunctors() { std::vector<Functor> functors; _functorPending = true; { std::lock_guard<std::mutex> lock(_mutex); functors.swap(_pendingFunctors); } for (const auto &func:functors) { func(); } _functorPending = false; }
本文中的这种方式能够提高并发度和吞吐量,对于IO密集型的应用比较合适。但是回调的方式也导致了业务逻辑的割裂。而且对于一些阻塞型或者耗时型的任务,这些显然是不能放在IO线程(即EventLoop所在的线程)中运行的,因为会严重影响EventLoop的正常运行。
对于这类耗时型的任务,一般做法是可以放在另外单独线程池中运行,这样就不会阻塞IO线程的运行了。