[专题]从零打造一个搜索引擎(网络模块)

关于网络模块,我本来是不准备自己写的,直接找个现成的即可。但是我找了libevent,libuv,evpp,muduo,Boost.Asio发现代码都太复杂了,看的头疼。最满意的是muduo,但是muduo本身不支持MacOS平台,另外代码较早,对c++11支持有限。

于是,我准备以muduo为原型,自己来实现网络库。采用Reactor模式,非阻塞 IO 模型,基于事件驱动和回调,原生支持多核多线程.支持linux和MacOS系统。

muduo因为出的早,用了boost的一些库来实现很多c++11的一些功能,实际上这些都是没有必要的,比如std::function,我们完全可以用lambda表达式来实现。另外muduo不支持macos,这一点也是我需要兼容的,毕竟我的日常开发都是在macos下。

整体架构如下图,one loop per thread模式

  1. 整个系统至少需要一个线程(main-thread), 每个thread绑定了一个loop。 listenfd 运行在main thread,clientfd 根据算法分配至其他 loop(如果整个系统只有一个main thread,那么都会分配到main thread )。

  2. 在Reactor的模式中,基本结构是事件循环,以事件驱动和事件回调的方式实现业务逻辑。其中的事件可以包含读写socket、连接的建立都可以采用非阻塞的方式进行.这些事件的监控在Linux下用的epoll,在MacOS则是kqueue,整个系统通过poll方法向epoll或kqueue获取有事件的fd列表,然后根据对应的event生成cb,将cb放到对应的thread去执行。需要注意的是,poll因为是阻塞的,所以如果这个时候没有外部的event,则会一直卡在poll处直到event到来或达到指定的timeout,显然这个时候执行队列的cb根本不会立即执行。所以我们还有一个wake机制,即像指定的fd里write一个8字节的空数据,以此让poll立即返回,执行后面的cb队列。关于wakeup机制,可以参考这里socketpair。注意,在poll返回的event里,如果识别出是wakeup发送的,则需要continue。wakeup对应的代码大约如下:

    void EventLoop::wakeup()
    {
      uint64_t one = 1;
      sockets::write(wakeupFd_, &one, sizeof one);
    }
  3. 每个loop有一个执行队列std::vector<std::function<void()>> _pendingFunctors,需注意的是,每个cb都需要在本地thread执行。如果不在本地,则需要将其添加到对应的thread的队列里,大约代码如下

    // typedef std::function<void()> Functor;
    void EventLoop::runInLoop(const Functor &cb) {
        if (IsInLoopThread()) {
            cb();
        } else {
            queueInLoop(cb);
        }
    }
    void EventLoop::queueInLoop(const angemon::Functor &cb) {
        {
            std::lock_guard<std::mutex> lock(_mutex);
            _pendingFunctors.push_back(cb);
        }
        if (!IsInLoopThread() || _functorPending) {
            Wakeup();
        }
    }
  4. doPendingFunctors的执行过程非常简单,此处就不多介绍了。大约代码如下:

    void EventLoop::doPendingFunctors() {
        std::vector<Functor> functors;
        _functorPending = true;
        {
            std::lock_guard<std::mutex> lock(_mutex);
            functors.swap(_pendingFunctors);
        }
        for (const auto &func:functors) {
            func();
        }
        _functorPending = false;
    }

本文中的这种方式能够提高并发度和吞吐量,对于IO密集型的应用比较合适。但是回调的方式也导致了业务逻辑的割裂。而且对于一些阻塞型或者耗时型的任务,这些显然是不能放在IO线程(即EventLoop所在的线程)中运行的,因为会严重影响EventLoop的正常运行。

对于这类耗时型的任务,一般做法是可以放在另外单独线程池中运行,这样就不会阻塞IO线程的运行了。