Muduo库源码剖析(1) Reactor架构三大核心模块

本文最后更新于:9 个月前

Muduo库源码剖析(1) Reactor架构三大核心模块

写在前面

好久不见,距离上次更新过去两个月了,一直在忙各种各样的事,搁置了很长时间。服务器的实现先暂且告一段落,因为我想一味地去抄袭别人的代码起到的作用可能会很有限,所以还是想自己去实现一下。正好这段时间也读了APUEUNP等等的材料,所以觉得准备的也差不多了。先从陈硕老师赫赫有名的Muduo网络库开始学习吧。

本文介绍一下Muduo库所采用的Reactor架构的三大核心模块。

正文

1 Reactor模式简介

Reactor模式也叫反应器模式,大多数IO相关组件如Netty、Redis在使用的IO模式,用于需要并发处理多个客户端的服务器。

Reactor模式就是基于建立连接与具体服务之间线程分离的模式。在Reactor模式中,会有一个线程负责与所有客户端建立连接,这个线程通常称之为 Reactor。然后在建立连接之后,Reactor 线程 会使用其它线程(可以有多个)来处理与每一个客户端之间的数据传输,这个(些)线程通常称之为 Handler

由于服务端需要与多个客户端通信,它的通信是一对多的关系,所以它需要使用 Reactor 模式。对客户端,它只需要与服务端通信,它的通信是一对一的关系,所以它不需要使用 Reactor 模式。也就是说,对客户端来讲,它不需要进行建立连接与传输数据之间的线程分离。

Muduo库有三个核心组件支撑一个Reactor实现持续的监听一组fd,并根据每个fd上发生的事件调用相应的处理函数。这三个组件分别是Channel类、Poller/EpollPoller类以及EventLoop类。

2 Channel类

2.1 Channel类简介

在TCP网络编程中,想要IO多路复用监听某个fd,就要把这个fd和该fd感兴趣的事件通过epoll_ctl注册到IO多路复用模块(也叫事件监听器)上。当事件监听器监听到该fd发生了某个事件。事件监听器返回发生事件的fd集合以及每个fd都发生了什么事件。

Channel类则封装了一个fd和该fd感兴趣的事件以及事件监听器监听到的该fd实际发生的事件。同时Channel类还提供了设置该fd的感兴趣事件,以及将该fd及其感兴趣事件注册到事件监听器或从事件监听器上移除,以及保存了该fd的每种事件对应的处理函数。

每个Channel对象自始至终只属于一个EventLoop,因此每个Channel对象都只属于一个IO线程,自始至终只负责一个文件描述符(fd)的IO事件分发,但它不拥有这个fd,也不会在析构时关闭这个fd。

用户不直接使用Channel,即不继承Channel,而会用到更上层的封装,如TcpConnection。

Channel的生命期由其owner class负责管理。

2.2 Channel类一些重要的成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private:
static string eventsToString(int fd, int ev);

void update(); // 本质上就是调用epoll_ctl()
void handleEventWithGuard(Timestamp receiveTime);

static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;

EventLoop* loop_; // 该fd属于哪个EventLoop对象
const int fd_; // 照看的文件描述符(fd)
int events_; // fd感兴趣的事件类型集合
int revents_; // 事件监听器实际监听到该fd发生的事件类型集合
int index_; // used by Poller.
bool logHup_;

std::weak_ptr<void> tie_;
bool tied_;
bool eventHandling_;
bool addedToLoop_;

// 这些是std::function类型,代表着这个Channel为这个文件描述符保存的各事件类型发生时的处理函数。
// 比如这个fd发生了可读事件,需要执行可读事件处理函数,这时候Channel类都替你保管好了这些可调用函数。
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;

2.3 Channel类一些重要的成员方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public:
typedef std::function<void()> EventCallback;
typedef std::function<void(Timestamp)> ReadEventCallback;

Channel(EventLoop* loop, int fd);
~Channel();

// handleEvent让每个发生了事件的Channel调用自己保管的事件处理函数。
// 每个Channel会根据自己文件描述符实际发生的事件(revents_变量)和感兴趣的事件(events_变量)
// 选择调用read_callback_和/或write_callback_和/或close_callback_和/或error_callback_。
void handleEvent(Timestamp receiveTime);

// 向Channel对象注册各类事件的处理函数
// 一个文件描述符会发生可读、可写、关闭、错误事件。
// 当发生这些事件后,就需要调用相应的处理函数来处理。
// 外部通过调用下面这四个函数可以将事件处理函数放进Channel类中,当需要调用的时候就可以直接拿出来调用了。
void setReadCallback(ReadEventCallback cb)
{ readCallback_ = std::move(cb); }
void setWriteCallback(EventCallback cb)
{ writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallback cb)
{ closeCallback_ = std::move(cb); }
void setErrorCallback(EventCallback cb)
{ errorCallback_ = std::move(cb); }

/// Tie this channel to the owner object managed by shared_ptr,
/// prevent the owner object being destroyed in handleEvent.
void tie(const std::shared_ptr<void>&);

int fd() const { return fd_; }
int events() const { return events_; }

// 当事件监听器监听到某个文件描述符发生了什么事件,通过这个函数可以将这个文件描述符实际发生的事件封装进这个Channel中。
void set_revents(int revt) { revents_ = revt; }

bool isNoneEvent() const { return events_ == kNoneEvent; }

// 将Channel中的文件描述符及其感兴趣事件注册事件监听器上或从事件监听器上移除
// 外部通过下面几个函数来告知Channel你所监管的文件描述符都对哪些事件类型感兴趣,
// 并把这个文件描述符及其感兴趣事件注册到事件监听器(IO多路复用模块)上。
void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }

bool isWriting() const { return events_ & kWriteEvent; }
bool isReading() const { return events_ & kReadEvent; }

// for Poller
int index() { return index_; }
void set_index(int idx) { index_ = idx; }

// for debug
string reventsToString() const;
string eventsToString() const;

void doNotLogHup() { logHup_ = false; }

EventLoop* ownerLoop() { return loop_; }
void remove();

3 Poller/EpollPoller类

3.1 Poller/EpollPoller简介

负责监听文件描述符事件是否触发以及返回发生事件的文件描述符以及具体事件的模块就是Poller。所以一个Poller对象对应一个事件监听器。在multi-reactor模型(多线程)中,有多少reactor就有多少Poller。

Muduo提供了epollpoll两种IO多路复用方法来实现事件监听。不过默认是使用epoll来实现,也可以通过选项选择poll

这个Poller是个抽象虚类,由EpollPollerPollPoller继承实现,与监听文件描述符和返回监听结果的具体方法也基本上是在这两个派生类中实现。EpollPoller就是封装了用epoll方法实现的与事件监听有关的各种方法,PollPoller就是封装了poll方法实现的与事件监听有关的各种方法。

3.2 Poller/EpollPoller的一些成员变量

Poller类:

1
2
3
4
5
6
7
8
9
10
protected:
typedef std::map<int, Channel*> ChannelMap;

// channels是std::unordered_map<int, Channel*>类型,
// 负责记录文件描述符-->Channel的映射,也帮忙保管所有注册在你这个Poller上的Channel。
ChannelMap channels_;

private:
// 所属的EventLoop对象
EventLoop* ownerLoop_;

EpollPoller类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private:
static const int kInitEventListSize = 16;

static const char* operationToString(int op);

void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;
void update(int operation, Channel* channel);

typedef std::vector<struct epoll_event> EventList;

// 用epoll_create方法返回的epoll句柄,这个是常识。
int epollfd_;

EventList events_;

3.3 EpollPoller类一些重要的成员方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public:
EPollPoller(EventLoop* loop);
~EPollPoller() override;

// 这个函数是Poller的核心。
// 该方法通过epoll_wait获取这个事件监听器上发生事件的fd及其对应发生的事件。
// 由于每个fd都是由一个Channel封装的,通过哈希表channels_可以根据fd找到封装这个fd的Channel。
// 将事件监听器监听到该fd发生的事件写进这个Channel中的revents成员变量中。
// 然后把这个Channel装进activeChannels中(它是一个vector<Channel*>)。
// 当外界调用完poll之后就能拿到事件监听器的监听结果(activeChannels_)。
// activeChannels就是事件监听器监听到的发生事件的fd,以及每个fd都发生了什么事件。
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;

void updateChannel(Channel* channel) override;
void removeChannel(Channel* channel) override;

4 EventLoop类

4.1 EventLoop类简介

刚才的Poller是封装了和事件监听有关的方法和成员,调用一次Poller::poll方法它就能给你返回事件监听器的监听结果(发生事件的fd及其发生的事件)。作为一个网络服务器,需要有持续监听、持续获取监听结果、持续处理监听结果对应的事件的能力,也就是我们需要循环的去调用Poller:poll方法获取实际发生事件的Channel集合,然后调用这些Channel里面保管的不同类型事件的处理函数(调用Channel::HandleEvent方法)。

EventLoop就是负责实现“循环”,负责驱动“循环”的重要模块!!Channel和Poller其实相当于EventLoop的手下,EventLoop整合封装了二者并向上提供了更方便的接口来使用。

4.2 EventLoop重要方法loop()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";

while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}

LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}

每个EventLoop对象都唯一绑定了一个线程,这个线程其实就在一直执行这个函数里面的while循环,这个while循环的大致逻辑比较简单。就是调用Poller::poll方法获取事件监听器上的监听结果。接下来在loop里面就会调用监听结果中每一个Channel的处理函数HandlerEvent( )。每一个Channel的处理函数会根据Channel类中封装的实际发生的事件,执行Channel类中封装的各事件处理函数。

5 全局概览Poller、Channel和EventLoop在整个Reactor通信架构中的角色

EventLoop起到一个驱动循环的功能,Poller负责从事件监听器上获取监听结果。而Channel类则在其中起到了将fd及其相关属性封装的作用,将fd及其感兴趣事件和发生的事件以及不同事件对应的回调函数封装在一起,这样在各个模块中传递更加方便。

图为Reactor架构图(图源知乎)

上面图中,每一个EventLoop都绑定了一个线程(一对一绑定),这种运行模式是Muduo库的特色!!充份利用了多核cpu的能力,每一个核的线程负责循环监听一组文件描述符的集合。这就是书中一直在强调的One Loop Per Thread

写在后面

本篇解读了Reactor模式的三大核心模块,也大致缕清了Muduo库的大体脉络,后面要在去深挖一些其他的主要类。革命尚未成功,同志仍需努力!