Muduo 源码分析

Muduo基于 Reactor 模式的 C++ 网络库,链接里有 muduo 的设计目标,这次分析的源码 版本为 v2.0.0

网络库一般关注 3 个方面:

  • 模型:Linux 下一般都是 Reactor 模式,常用的线程模型有:
    • Redisone loop one thread
    • Nginxone loop per process + SO_REUSEPORT
    • Muduoone loop per thread + threadpool
  • 性能:一般模型确定下来,性能也就大差不差了,这点主要是实现上面考虑的,是追求极致的性能还是保证代码的易读性。
  • 接口:既要保证通用性能够提供多种模型的选择和全面的功能,又要保证接口的易用性,我比较关注这点。

Muduo 推荐的是 one loop per thread + threadpool,就是配置多个线程,每个线程跑一个 eventloop,一个线程只负责接受连接并分发给其他线程,之后该连接的所有 i/o 操作 都由该线程处理,threadpool 可以用来处理耗时长的任务。当然 muduo 是个通用的网络库,肯定也支持其他模式,比如设置只用 1 个线程且不用 threadpool 处理逻辑就是 Redis 那种 (其实 Redis 应该是 one loop one thread + threadpool,它有多个 bio 线程来处理任务);设置 SO_REUSEPORT 起多个进程就是 Nginx 那种。

Reactor

Reactor 主要涉及下面几个类:

  • Channel:封装 I/O 事件和回调,不拥有 fd,可以代表多种实体:listening fdtimer fdevent fd 等。
  • PollerI/O Multiplexing 的基类,封装底层的系统调用(poll(2)epoll(2))。根据 Channel 更新事件,并返回活跃的 Channel
  • Eventloop:整合 ChannelPoller,提供更高层的接口,如定时器。

Channel

因为有多种 I/O Multiplexing 的系统调用,所以用 Channel 封装事件并保存对应的 callback 传递给 Poller,类似 RedisaeFileEvent

class Channel : noncopyable
{
 public:
  typedef std::function<void()> EventCallback;
  typedef std::function<void(Timestamp)> ReadEventCallback;

  Channel(EventLoop* loop, int fd);
  ~Channel();

  void handleEvent(Timestamp receiveTime);

 private:
  static const int kNoneEvent;
  static const int kReadEvent;
  static const int kWriteEvent;

  EventLoop* loop_;
  const int  fd_;
  int        events_;
  int        revents_; // it's the received event types of epoll or poll
  int        index_; // used by Poller.

  ReadEventCallback readCallback_;
  EventCallback writeCallback_;
  EventCallback closeCallback_;
  EventCallback errorCallback_;
};

events_Channel 关心的事件,Poller 根据这个来设置。revents_Poller 返回的已就绪的事件,handleEvent() 会调用相应的 callback 来处理。

所有需要由 EventLoop 处理的如 AcceptorTcpConnection 都有 Channel 成员并设置 callback 注册到 EventLoop 中。Muduocallback 基本都是 member function, 用 std::bind() 绑定 this 指针来注册,一些网络库会采用继承接口类来实现回调的注册。

其他没什么好说的,值得一提的是,poll(2)epoll(2) 的几个常量值是相同的,所以 kNoneEventkReadEventkWriteEvent 可以直接用 poll(2) 的常量:

// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
static_assert(EPOLLIN == POLLIN,        "epoll uses same flag values as poll");
static_assert(EPOLLPRI == POLLPRI,      "epoll uses same flag values as poll");
static_assert(EPOLLOUT == POLLOUT,      "epoll uses same flag values as poll");
static_assert(EPOLLRDHUP == POLLRDHUP,  "epoll uses same flag values as poll");
static_assert(EPOLLERR == POLLERR,      "epoll uses same flag values as poll");
static_assert(EPOLLHUP == POLLHUP,      "epoll uses same flag values as poll");

Poller

PollerI/O Multiplexing 的抽象基类,Muduo 支持 poll(PollPoller)epoll(EPollPoller),使用 level-trigger,在功能上只封装了底层的系统调用,不做多余的工作:

class Poller : noncopyable
{
 public:
  typedef std::vector<Channel*> ChannelList;

  Poller(EventLoop* loop);
  virtual ~Poller();

  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;

  virtual void updateChannel(Channel* channel) = 0;

  virtual void removeChannel(Channel* channel) = 0;

  virtual bool hasChannel(Channel* channel) const;

 protected:
  typedef std::map<int, Channel*> ChannelMap;
  ChannelMap channels_;

 private:
  EventLoop* ownerLoop_;
};

我只关注 epoll(2) 的,有 2 点值得一提:

  • 因为 epoll(2) 是记录关注的 fd 和事件的,所以不需要重复传递兴趣列表,但是区分了 EPOLL_CTL_ADDEPOLL_CTL_MOD,所以 Channelindex_ 记录了状态是要 add 还是 mod。 而 Redis 是用数组记录了所有注册的 aeFileEvent,查找数组即可知道该 add 还是 mod
  • epoll_wait() 要传递数组参数用于返回就绪的事件。Muduo 采用的是动态增长的数组,默认大小是 16,若这次用完了,就会增大 1 倍。而 Redis 是固定大小的,因为有 maxclients 可以知道最多有多少事件。

EventLoop

EventLoop 整合了 ChannelPoller 提供更方便的接口来使用,主要功能为:处理 Channel、处理定时事件、处理任务事件。主要接口如下:

class EventLoop : noncopyable
{
 public:
  // EventLoop
  void loop();
  void quit();

  // 任务事件
  void wakeup();
  void runInLoop(Functor cb);
  void queueInLoop(Functor cb);

  // 定时器
  TimerId runAt(Timestamp time, TimerCallback cb);
  TimerId runAfter(double delay, TimerCallback cb);
  TimerId runEvery(double interval, TimerCallback cb);
  void cancel(TimerId timerId);

  // 处理 Channel
  void updateChannel(Channel* channel);
  void removeChannel(Channel* channel);
  bool hasChannel(Channel* channel);
};

EventLoop 的用法是先注册 Channel,如 listening fd,然后调用 loop() 一直循环处理各种任务,在 loop 中可以注册新的 Channel、定时事件和任务事件:

void EventLoop::loop()
{
  while (!quit_)
  {
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    for (Channel* channel : activeChannels_)
    {
      currentActiveChannel_ = channel;
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    doPendingFunctors();
  }
}

先来说任务事件,使用场景主要是其他线程想要在 EventLoop 线程做些工作,比如 EventLoop 线程把耗时的任务放到 threadpool 中执行,完成后通知 EventLoop 线程获取结果。 最常见的实现方式是用 pipe(2)EventLoop 注册读事件,其他线程把结果保存在队列中,然后写 pipeEventLoop 线程就会唤醒处理。Muduo 也是类似实现,只是用 eventfd(2) 来唤醒, 具体用法就看 man page 吧:

Applications can use an eventfd file descriptor instead of a pipe (see pipe(2)) in all cases where a pipe is used simply to signal events. The kernel overhead of an eventfd file descriptor is much lower than that of a pipe, and only one file descriptor is required (versus the two required for a pipe).

再来说定时器。传统的实现为设置 Poller 的超时时间为最近的定时任务,然后每次 Poller 返回时检查一下定时器,定时器会按超时时间排序,如用最小堆等。MuduoTimerQueue 用于管理定时器,用 std::set 保存 定时器,因为 Timestamp 可能相同,所以保存的是 std::pair<Timestamp, Timer*>,用 timerfd 来触发定时器任务的执行,使得实现更加一致。

One Loop Per Thread

我觉得 Muduo 实现上最大的优点是将 EventLoop 和创建线程绑定在一起,所有类都和所属的 EventLoop 绑定,通过 EventLoop 提供的接口实现线程间交互, 使得从 one loop one threadone loop per thread 的实现非常方便。

每个线程都会用 __thread 保存自己的 tid,是用 syscall(SYS_gettid) 获取的,好处为(书上列的,记录一下):

  • 类型是 pid_t,通常是小整数。
  • Linux 中,直接表示内核的任务调度 id,可以在 /proc/tid/proc/pid/task/tid 找到。top(1) 之类的工具也是显示 tid
  • 任何时刻都是全局唯一的。
  • 0 是非法值。

不用 pthread_self() 的原因是:

  • pthread_t 的类型未知,也就无法比较大小,或作为管理容器的 Key
  • pthread_t 只在进程内有意义,只能保证同一进程内,同一时刻的各个线程的 id 不同,不能保证同一进程先后多个线程具有不同的 id

EventLoop 在构造时会记录创建线程的 tid,所有可能跨线程的操作都会用任务事件 runInLoop 传递,非常方便:

bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

void EventLoop::runInLoop(Functor cb)
{
  if (isInLoopThread())
  {
    cb();
  }
  else
  {
    queueInLoop(std::move(cb));
  }
}

EventLoopThread 用于创建 EventLoop 线程,要注意的是避免在构造函数中创建线程并暴露 this 指针,因为可能新线程使用时还未构造完成,Muduo 是主动调用 start 才会创建线程。因为 EventLoop 是线程间 交互的媒介,所以线程启动后会返回对应的指针。

EventLoopThreadPool 会创建固定个数的 EventLoopThread,并支持 round-robinhash-index 的返回线程对应的 EventLoop,配合 runInLoop 即可实现任务的分发,如绑定 TcpConnection

TcpServer

配合上面的基础设施再实现 TCP 相关的逻辑就可以实现支持多种模型的 TCP 网络框架了,主要包括:

  • 连接的管理,包括创建、关闭等。
  • 数据的收发。

Acceptor

Acceptor 用于接受连接并调用传入的 callback 创建连接:

typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;

Acceptor 在构造时预留了一个 idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)),当 fd 耗尽时,关闭 idleFd_ 再接收客户端连接立刻关闭,对端就可以立刻知道连接被关闭,关闭前也可以发送错误信息给对端, 之后重新打开 idleFd_

TcpConnection

Muduo 把每个 Tcp 连接都封装为 TcpConnection,通过 shared_ptr 暴露给用户使用,主要接口如下:


class TcpConnection : noncopyable,
                      public std::enable_shared_from_this<TcpConnection>
{
 public:
  // 发送数据
  void send(const void* message, int len);
  void send(const StringPiece& message);
  void send(Buffer* message);  // this one will swap data

  // 设置 context
  void setContext(const boost::any& context)
  { context_ = context; }
  const boost::any& getContext() const
  { return context_; }
  boost::any* getMutableContext()
  { return &context_; }

  // 设置 callback
  // 连接建立后调用
  typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }
  // 收到消息后调用
  typedef std::function<void (const TcpConnectionPtr&,
                              Buffer*,
                              Timestamp)> MessageCallback;
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }
  // 消息全部发完后调用
  typedef std::function<void (const TcpConnectionPtr&)> WriteCompleteCallback;
  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  { writeCompleteCallback_ = cb; }
  // 当发送缓冲区积压的数据超出 highWaterMark 时调用
  typedef std::function<void (const TcpConnectionPtr&, size_t)> HighWaterMarkCallback;
  void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
  { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
};

Muduo 使用 Buffer 保存数据,结构如下图: image

Buffer 可读可写,用 readIndexwriteIndex 区分范围,不是循环队列,当空间不够用时会增大 Buffer 调整 index。 在创建时预留了 8 字节的 prependable bytes,用于高效的实现 length + payload 格式。不过在当前的实现中 Buffer 不会自动缩小,要主动调 shrink,因为经常增大或缩小,数据拷贝会很频繁,许多实现 会当 Buffer 增大到某个阈值时,之后不再扩大,而是用 list 追加新的 BufferRedis 就是类似实现。

TcpConnection 读响应时比较特别,使用 readv(2) 读取数据:一个是 TcpConnectionBuffer;一个是栈上的 char extrabuf[65536]。猜测因为是 level-trigger 所以不需要一次性读完所有数据(避免阻塞其他连接的请求), 但如果一次读的太少就要经过多次 Poller 返回,而又想避免数据拷贝,比如先读到栈上的数组,然后拷贝到 Buffer,但是如果每个 TcpConnectionBuffer 都默认很大,又太浪费内存,所以才这么实现的。

要发送数据时调用 send,把数据追加到 Buffer 中,这个操作是线程安全的,通过 runInLoop 传递给 EventLoop 线程,很方便。Muduo 也是在第一次发送时直接 write,还有未写完的才会注册写事件,减少 Poller 系统调用。

还有些其他功能,如:

  • 有多个 callback 在对应时机调用。
  • any 保存任意类型的 context 方便使用。

ThreadPool

这里的 ThreadPool 是用于执行任务,至于 1 个线程接收连接,分发给多个 i/o 线程是通过 EventLoopThreadPoolEventloop.runInLoop 实现的。ThreadPool 的实现也很简单,从成员变量就可以看出来实现:

class ThreadPool : noncopyable
{
 public:
  typedef std::function<void ()> Task;

 private:
  mutable MutexLock mutex_;
  Condition notEmpty_ GUARDED_BY(mutex_);
  Condition notFull_ GUARDED_BY(mutex_);
  std::vector<std::unique_ptr<muduo::Thread>> threads_;
  std::deque<Task> queue_ GUARDED_BY(mutex_);
  size_t maxQueueSize_;
};

TcpClient

TcpClient 负责建立连接和处理 I/O,它不拥有 EventLoop,每个 TcpClient 只负责一个 TcpConnection。其实 clientserver 很相似,区别在于一个是被动的 listen,一个是主动的 connectTcpClientTcpServer 都是由外部传入的 EventLoop 构造,可以使多个 TcpClient 共用一个 EventLoop,再搭配上 EventLoopThreadPool + round-robin 就可以实现类似 TcpServer 的多线程 client

Connector

ConnectorAcceptor 类似,只负责建立连接。使用 nonblocking connect,当返回可写事件时要检查 socket error,并支持重试,重试会有动态增加的延迟时间。因为如果服务端突然挂掉,大量连接同时重试 会影响服务质量并造成拥塞。

Summary

Muduo 的代码写的很清晰易懂,代码量也不多,如果 2 年前的我看的话应该会学到很多(其实当时就听说过了,但因为不会 C++ 就没看)。

分类:

更新时间:

留下评论