Muduo 源码分析
Muduo 是基于 Reactor 模式的 C++ 网络库,链接里有 muduo 的设计目标,这次分析的源码
版本为 v2.0.0。
网络库一般关注 3 个方面:
- 模型:
Linux下一般都是Reactor模式,常用的线程模型有:Redis的one loop one thread。Nginx的one loop per process + SO_REUSEPORT。Muduo的one loop per thread + threadpool。
- 性能:一般模型确定下来,性能也就大差不差了,这点主要是实现上面考虑的,是追求极致的性能还是保证代码的易读性。
- 接口:既要保证通用性能够提供多种模型的选择和全面的功能,又要保证接口的易用性,我比较关注这点。
Muduo 推荐的是 one loop per thread + threadpool,就是配置多个线程,每个线程跑一个 eventloop,一个线程只负责接受连接并分发给其他线程,之后该连接的所有 i/o 操作
都由该线程处理,threadpool 可以用来处理耗时长的任务。当然 muduo 是个通用的网络库,肯定也支持其他模式,比如设置只用 1 个线程且不用 threadpool 处理逻辑就是 Redis 那种
(其实 Redis 应该是 one loop one thread + threadpool,它有多个 bio 线程来处理任务);设置 SO_REUSEPORT 起多个进程就是 Nginx 那种。
Reactor
Reactor 主要涉及下面几个类:
Channel:封装I/O事件和回调,不拥有fd,可以代表多种实体:listening fd、timer fd、event fd等。Poller:I/O Multiplexing的基类,封装底层的系统调用(poll(2)和epoll(2))。根据Channel更新事件,并返回活跃的Channel。Eventloop:整合Channel和Poller,提供更高层的接口,如定时器。
Channel
因为有多种 I/O Multiplexing 的系统调用,所以用 Channel 封装事件并保存对应的 callback 传递给 Poller,类似 Redis 的 aeFileEvent:
class Channel : noncopyable
{
public:
typedef std::function<void()> EventCallback;
typedef std::function<void(Timestamp)> ReadEventCallback;
Channel(EventLoop* loop, int fd);
~Channel();
void handleEvent(Timestamp receiveTime);
private:
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
EventLoop* loop_;
const int fd_;
int events_;
int revents_; // it's the received event types of epoll or poll
int index_; // used by Poller.
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};
events_ 是 Channel 关心的事件,Poller 根据这个来设置。revents_ 是 Poller 返回的已就绪的事件,handleEvent() 会调用相应的 callback 来处理。
所有需要由 EventLoop 处理的如 Acceptor、TcpConnection 都有 Channel 成员并设置 callback 注册到 EventLoop 中。Muduo 的 callback 基本都是 member function,
用 std::bind() 绑定 this 指针来注册,一些网络库会采用继承接口类来实现回调的注册。
其他没什么好说的,值得一提的是,poll(2) 和 epoll(2) 的几个常量值是相同的,所以 kNoneEvent、kReadEvent、kWriteEvent 可以直接用 poll(2) 的常量:
// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
static_assert(EPOLLIN == POLLIN, "epoll uses same flag values as poll");
static_assert(EPOLLPRI == POLLPRI, "epoll uses same flag values as poll");
static_assert(EPOLLOUT == POLLOUT, "epoll uses same flag values as poll");
static_assert(EPOLLRDHUP == POLLRDHUP, "epoll uses same flag values as poll");
static_assert(EPOLLERR == POLLERR, "epoll uses same flag values as poll");
static_assert(EPOLLHUP == POLLHUP, "epoll uses same flag values as poll");
Poller
Poller 是 I/O Multiplexing 的抽象基类,Muduo 支持 poll(PollPoller) 和 epoll(EPollPoller),使用 level-trigger,在功能上只封装了底层的系统调用,不做多余的工作:
class Poller : noncopyable
{
public:
typedef std::vector<Channel*> ChannelList;
Poller(EventLoop* loop);
virtual ~Poller();
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
virtual void updateChannel(Channel* channel) = 0;
virtual void removeChannel(Channel* channel) = 0;
virtual bool hasChannel(Channel* channel) const;
protected:
typedef std::map<int, Channel*> ChannelMap;
ChannelMap channels_;
private:
EventLoop* ownerLoop_;
};
我只关注 epoll(2) 的,有 2 点值得一提:
- 因为
epoll(2)是记录关注的fd和事件的,所以不需要重复传递兴趣列表,但是区分了EPOLL_CTL_ADD和EPOLL_CTL_MOD,所以Channel用index_记录了状态是要add还是mod。 而Redis是用数组记录了所有注册的aeFileEvent,查找数组即可知道该add还是mod。 epoll_wait()要传递数组参数用于返回就绪的事件。Muduo采用的是动态增长的数组,默认大小是16,若这次用完了,就会增大1倍。而Redis是固定大小的,因为有maxclients可以知道最多有多少事件。
EventLoop
EventLoop 整合了 Channel 和 Poller 提供更方便的接口来使用,主要功能为:处理 Channel、处理定时事件、处理任务事件。主要接口如下:
class EventLoop : noncopyable
{
public:
// EventLoop
void loop();
void quit();
// 任务事件
void wakeup();
void runInLoop(Functor cb);
void queueInLoop(Functor cb);
// 定时器
TimerId runAt(Timestamp time, TimerCallback cb);
TimerId runAfter(double delay, TimerCallback cb);
TimerId runEvery(double interval, TimerCallback cb);
void cancel(TimerId timerId);
// 处理 Channel
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);
};
EventLoop 的用法是先注册 Channel,如 listening fd,然后调用 loop() 一直循环处理各种任务,在 loop 中可以注册新的 Channel、定时事件和任务事件:
void EventLoop::loop()
{
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
doPendingFunctors();
}
}
先来说任务事件,使用场景主要是其他线程想要在 EventLoop 线程做些工作,比如 EventLoop 线程把耗时的任务放到 threadpool 中执行,完成后通知 EventLoop 线程获取结果。
最常见的实现方式是用 pipe(2):EventLoop 注册读事件,其他线程把结果保存在队列中,然后写 pipe,EventLoop 线程就会唤醒处理。Muduo 也是类似实现,只是用 eventfd(2) 来唤醒,
具体用法就看 man page 吧:
Applications can use an eventfd file descriptor instead of a pipe (see pipe(2)) in all cases where a pipe is used simply to signal events. The kernel overhead of an eventfd file descriptor is much lower than that of a pipe, and only one file descriptor is required (versus the two required for a pipe).
再来说定时器。传统的实现为设置 Poller 的超时时间为最近的定时任务,然后每次 Poller 返回时检查一下定时器,定时器会按超时时间排序,如用最小堆等。Muduo 的 TimerQueue 用于管理定时器,用 std::set 保存
定时器,因为 Timestamp 可能相同,所以保存的是 std::pair<Timestamp, Timer*>,用 timerfd 来触发定时器任务的执行,使得实现更加一致。
One Loop Per Thread
我觉得 Muduo 实现上最大的优点是将 EventLoop 和创建线程绑定在一起,所有类都和所属的 EventLoop 绑定,通过 EventLoop 提供的接口实现线程间交互,
使得从 one loop one thread 到 one loop per thread 的实现非常方便。
每个线程都会用 __thread 保存自己的 tid,是用 syscall(SYS_gettid) 获取的,好处为(书上列的,记录一下):
- 类型是
pid_t,通常是小整数。 - 在
Linux中,直接表示内核的任务调度id,可以在/proc/tid或/proc/pid/task/tid找到。top(1)之类的工具也是显示tid。 - 任何时刻都是全局唯一的。
0是非法值。
不用 pthread_self() 的原因是:
pthread_t的类型未知,也就无法比较大小,或作为管理容器的Key。pthread_t只在进程内有意义,只能保证同一进程内,同一时刻的各个线程的id不同,不能保证同一进程先后多个线程具有不同的id。
EventLoop 在构造时会记录创建线程的 tid,所有可能跨线程的操作都会用任务事件 runInLoop 传递,非常方便:
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}
EventLoopThread 用于创建 EventLoop 线程,要注意的是避免在构造函数中创建线程并暴露 this 指针,因为可能新线程使用时还未构造完成,Muduo 是主动调用 start 才会创建线程。因为 EventLoop 是线程间
交互的媒介,所以线程启动后会返回对应的指针。
EventLoopThreadPool 会创建固定个数的 EventLoopThread,并支持 round-robin 和 hash-index 的返回线程对应的 EventLoop,配合 runInLoop 即可实现任务的分发,如绑定 TcpConnection。
TcpServer
配合上面的基础设施再实现 TCP 相关的逻辑就可以实现支持多种模型的 TCP 网络框架了,主要包括:
- 连接的管理,包括创建、关闭等。
- 数据的收发。
Acceptor
Acceptor 用于接受连接并调用传入的 callback 创建连接:
typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;
Acceptor 在构造时预留了一个 idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)),当 fd 耗尽时,关闭 idleFd_ 再接收客户端连接立刻关闭,对端就可以立刻知道连接被关闭,关闭前也可以发送错误信息给对端,
之后重新打开 idleFd_。
TcpConnection
Muduo 把每个 Tcp 连接都封装为 TcpConnection,通过 shared_ptr 暴露给用户使用,主要接口如下:
class TcpConnection : noncopyable,
public std::enable_shared_from_this<TcpConnection>
{
public:
// 发送数据
void send(const void* message, int len);
void send(const StringPiece& message);
void send(Buffer* message); // this one will swap data
// 设置 context
void setContext(const boost::any& context)
{ context_ = context; }
const boost::any& getContext() const
{ return context_; }
boost::any* getMutableContext()
{ return &context_; }
// 设置 callback
// 连接建立后调用
typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
// 收到消息后调用
typedef std::function<void (const TcpConnectionPtr&,
Buffer*,
Timestamp)> MessageCallback;
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
// 消息全部发完后调用
typedef std::function<void (const TcpConnectionPtr&)> WriteCompleteCallback;
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }
// 当发送缓冲区积压的数据超出 highWaterMark 时调用
typedef std::function<void (const TcpConnectionPtr&, size_t)> HighWaterMarkCallback;
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
};
Muduo 使用 Buffer 保存数据,结构如下图:

Buffer 可读可写,用 readIndex 和 writeIndex 区分范围,不是循环队列,当空间不够用时会增大 Buffer 调整 index。
在创建时预留了 8 字节的 prependable bytes,用于高效的实现 length + payload 格式。不过在当前的实现中 Buffer 不会自动缩小,要主动调 shrink,因为经常增大或缩小,数据拷贝会很频繁,许多实现
会当 Buffer 增大到某个阈值时,之后不再扩大,而是用 list 追加新的 Buffer,Redis 就是类似实现。
TcpConnection 读响应时比较特别,使用 readv(2) 读取数据:一个是 TcpConnection 的 Buffer;一个是栈上的 char extrabuf[65536]。猜测因为是 level-trigger 所以不需要一次性读完所有数据(避免阻塞其他连接的请求),
但如果一次读的太少就要经过多次 Poller 返回,而又想避免数据拷贝,比如先读到栈上的数组,然后拷贝到 Buffer,但是如果每个 TcpConnection 的 Buffer 都默认很大,又太浪费内存,所以才这么实现的。
要发送数据时调用 send,把数据追加到 Buffer 中,这个操作是线程安全的,通过 runInLoop 传递给 EventLoop 线程,很方便。Muduo 也是在第一次发送时直接 write,还有未写完的才会注册写事件,减少 Poller 系统调用。
还有些其他功能,如:
- 有多个
callback在对应时机调用。 - 用
any保存任意类型的context方便使用。
ThreadPool
这里的 ThreadPool 是用于执行任务,至于 1 个线程接收连接,分发给多个 i/o 线程是通过 EventLoopThreadPool 和 Eventloop.runInLoop 实现的。ThreadPool 的实现也很简单,从成员变量就可以看出来实现:
class ThreadPool : noncopyable
{
public:
typedef std::function<void ()> Task;
private:
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
std::vector<std::unique_ptr<muduo::Thread>> threads_;
std::deque<Task> queue_ GUARDED_BY(mutex_);
size_t maxQueueSize_;
};
TcpClient
TcpClient 负责建立连接和处理 I/O,它不拥有 EventLoop,每个 TcpClient 只负责一个 TcpConnection。其实 client 和 server 很相似,区别在于一个是被动的 listen,一个是主动的 connect。
TcpClient 和 TcpServer 都是由外部传入的 EventLoop 构造,可以使多个 TcpClient 共用一个 EventLoop,再搭配上 EventLoopThreadPool + round-robin 就可以实现类似 TcpServer 的多线程 client。
Connector
Connector 和 Acceptor 类似,只负责建立连接。使用 nonblocking connect,当返回可写事件时要检查 socket error,并支持重试,重试会有动态增加的延迟时间。因为如果服务端突然挂掉,大量连接同时重试
会影响服务质量并造成拥塞。
Summary
Muduo 的代码写的很清晰易懂,代码量也不多,如果 2 年前的我看的话应该会学到很多(其实当时就听说过了,但因为不会 C++ 就没看)。
留下评论