WebServer(2) 重整旗鼓 Channel类

本文最后更新于:8 个月前

WebServer(2) 重整旗鼓 Channel类

写在前面

距离上次更新的本专栏的第一章已经过去好久好久了,虽然想速成WebServer的flag没能达成,但是期间也从未歇息,阅读了比较经典的《TCP/IP网络编程》《Linux高性能服务器编程》和《Linux多线程服务端编程》,同时也阅读了一些优秀源码(Muduo、Libevent)。自己的网络编程和Cpp基础也在这一段时间迎来了大幅度提升,现在重启本专栏,希望能通过自己完整实现一个高性能高并发的WebServer。同时,在本项目构建过程中,我会尝试使用git做版本管理,应用Google-Style代码规范格式,用cmake编译,并全局采用cpp11标准。

源码链接:https://github.com/yxiao-yang/Tiny-Webserver.git

正文

1 什么是Channel

Channel 对文件描述符和事件进行了一层封装。平常我们写网络编程相关函数,基本就是创建套接字,绑定地址,转变为可监听状态(这部分我们在 Socket 类中实现过了,交给 Acceptor 调用即可),然后接受连接。

但是得到了一个初始化好的 socket 还不够,我们还需要监听这个 socket 上的事件并且处理事件。比如我们在 Reactor 模型中使用了 epoll 监听该 socket 上的事件,我们还需将需要被监视的套接字和监视的事件注册到 epoll 对象中。

可以想到文件描述符和事件和 IO 函数全都混在在了一起,极其不好维护。而 muduo 中的 Channel 类将文件描述符和其感兴趣的事件(需要监听的事件)封装到了一起。而事件监听相关的代码放到了 Poller/EPollPoller 类中。

2 成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* const int Channel::kNoneEvent = 0;
* const int Channel::kReadEvent = EPOLLIN | EPOLLPRI;
* const int Channel::kWriteEvent = EPOLLOUT;
*/
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;

EventLoop *loop_; // 当前Channel属于的EventLoop
const int fd_; // fd, Poller监听对象
int events_; // 注册fd感兴趣的事件
int revents_; // poller返回的具体发生的事件
int index_; // 在Poller上注册的情况

std::weak_ptr<void> tie_; // 弱指针指向TcpConnection(必要时升级为shared_ptr多一份引用计数,避免用户误删)
bool tied_; // 标志此 Channel 是否被调用过 Channel::tie 方法

// 保存着事件到来时的回调函数
ReadEventCallback readCallback_; // 读事件回调函数
EventCallback writeCallback_; // 写事件回调函数
EventCallback closeCallback_; // 连接关闭回调函数
EventCallback errorCallback_; // 错误发生回调函数
  • int fd_:这个Channel对象照看的文件描述符
  • int events_:代表fd感兴趣的事件类型集合
  • int revents_:代表事件监听器实际监听到该fd实际发生的事件类型集合,当事件监听器监听到一个fd发生了什么事件,通过Channel::Set_revents()函数来设置revents值。
  • EventLoop* loop_:这个 Channel 属于哪个EventLoop对象,因为 muduo 采用的是 one loop per thread 模型,所以我们有不止一个 EventLoop。我们的 mainLoop 接收新连接,将新连接相关事件注册到线程池中的某一线程的 subLoop 上(轮询)。我们不希望跨线程的处理函数,所以每个 Channel 都需要记录是哪个 EventLoop 在处理自己的事情,这其中还涉及到了线程判断的问题。
  • read_callback_write_callback_close_callback_error_callback_:这些是 std::function 类型,代表着这个Channel为这个文件描述符保存的各事件类型发生时的处理函数。比如这个fd发生了可读事件,需要执行可读事件处理函数,这时候Channel类都替你保管好了这些可调用函数。到时候交给 EventLoop 执行即可。
  • index:我们使用 index 来记录 Channel 与 Poller 相关的几种状态,Poller 类会判断当前 channel 的状态然后处理不同的事情。
    • kNew:是否还未被poll监视
    • kAdded:是否已在被监视中
    • kDeleted:是否已被移除
  • kNoneEventkReadEventkWriteEvent:事件状态设置会使用的变量

3 成员函数

3.1 设置此 Channel 对于事件的回调函数

1
2
3
4
5
6
// 设置回调函数对象
// 使用右值引用,延长了临时cb对象的生命周期,避免了拷贝操作
void SetReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void SetWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); }
void SetCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); }
void SetErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }

3.2 设置 Channel 感兴趣的事件到 Poller

1
2
3
4
5
6
7
8
// 设置fd相应的事件状态,update()其本质调用epoll_ctl
void EnableReading() { events_ |= kReadEvent; update(); } // 设置读事件到poll对象中
void EnableWriting() { events_ |= kWriteEvent; update(); } // 设置写事件到poll对象中
void DisableReading() { events_ &= ~kReadEvent; update(); } // 从poll对象中移除读事件
void DisableWriting() { events_ &= ~kWriteEvent; update(); } // 从poll对象中移除写事件
void DisableAll() { events_ = kNoneEvent; update(); } // 关闭所有事件
bool IsWriting() const { return events_ & kWriteEvent; } // 是否关注写事件
bool IsReading() const { return events_ & kReadEvent; } // 是否关注读事件

3.3 更新Channel关注的事件

设置好该 channel 的监视事件的类型,调用私有函数 Update 向 Poller 注册。

1
2
3
4
5
6
7
8
9
/**
* 当改变channel所表示fd的events事件后
* Update负责在poller里面更改fd相应的事件epoll_ctl
*/
void Channel::Update() {
// 通过该channel所属的EventLoop,调用poller对应的方法,注册fd的events事件
// 调用EventLoop::UpdateChannel,传入channel指针
loop_->UpdateChannel(this);
}

3.4 移除操作

1
2
3
4
5
6
// 从epoll对象中移除该fd
void Channel::Remove() {
// 调用EventLoop::RemoveChannel,传入channel指针
// EventLoop::RemoveChannel => Poller::RemoveChannel
loop_->RemoveChannel(this);
}

3.5 用于增加TcpConnection生命周期的tie方法(防止用户误删操作)

1
2
3
4
5
6
7
// 在TcpConnection建立的时候会调用
void Channel::Tie(const std::shared_ptr<void> &obj) {
// weak_ptr 指向 obj
tie_ = obj;
// 设置tied_标志
tied_ = true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// fd得到poller通知以后,去处理事件
void Channel::HandleEvent(Timestamp receiveTime) {
/**
* 调用Channel::Tie后会设置tid_=true
* 而TcpConnection::ConnectEstablished()会调用channel_->Tie(shared_from_this());
* 所以对于TcpConnection::channel_ 需要多一份强引用的保证以免用户误删TcpConnection对象
*/
if (tied_) {
std::shared_ptr<void> guard = tie_.lock();
if (guard) {
HandleEventWithGuard(receiveTime);
}
} else {
HandleEventWithGuard(receiveTime);
}
}

用户使用 muduo 库的时候,会利用到 TcpConnection 。用户可以看见 TcpConnection ,如果用户注册了要监视的事件和处理的回调函数,并在处理 subLoop 处理过程中「误删」了 TcpConnection 的话会发生什么呢?

总之,EventLoop 肯定不能很顺畅的运行下去。毕竟它的生命周期小于 TcpConnection。为了防止用户误删的情况,TcpConnection 在创建之初 TcpConnection::ConnectEstablished() 会调用此函数来提升对象生命周期。

实现方案是在处理事件时,如果对被调用了Tie()方法的Channel对象,我们让一个共享型智能指针指向它,在处理事件期间延长它的生命周期。哪怕外面「误删」了此对象,也会因为多出来的引用计数而避免销毁操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 连接建立
void TcpConnection::ConnectEstablished() {
SetState(kConnected); // 建立连接,设置一开始状态为连接态
/**
* channel_->Tie(shared_from_this());
* Tie相当于在底层有一个强引用指针记录着,防止析构
* 为了防止TcpConnection这个资源被误删掉,而这个时候还有许多事件要处理
* channel->Tie 会进行一次判断,是否将弱引用指针变成强引用,变成的话就防止了计数为0而被析构的可能
*/
channel_->Tie(shared_from_this());
channel_->EnableReading(); // 向poller注册channel的EPOLLIN读事件

// 新连接建立 执行回调
connectionCallback_(shared_from_this());
}

注意,传递的是 this 指针,所以是在 Channel 的内部增加对 TcpConnection 对象的引用计数(而不是 Channel 对象)。这里体现了 shared_ptr 的一处妙用,可以通过引用计数来控制变量的生命周期。巧妙地在内部增加一个引用计数,假设在外面误删,也不会因为引用计数为 0 而删除对象。

weak_ptr.lock() 会返回 shared_ptr(如果 weak_ptr 不为空)。

3.6 根据相应事件执行Channel保存的回调函数

我们的Channel里面保存了许多回调函数,这些都是在对应的事件下被调用的。用户提前设置写好此事件的回调函数,并绑定到Channel的成员里。等到事件发生时,Channel自然的调用事件处理方法。借由回调操作实现了异步的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void Channel::HandleEventWithGuard(Timestamp receiveTime) {    
// 对端关闭事件
// 当TcpConnection对应Channel,通过shutdown关闭写端,epoll触发EPOLLHUP
if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)) {
// 确认是否拥有回调函数
if (closeCallback_) {
closeCallback_();
}
}

// 错误事件
if (revents_ & (EPOLLERR)) {
LOG_ERROR << "the fd = " << this->fd();
if (errorCallback_) {
errorCallback_();
}
}

// 读事件
if (revents_ & (EPOLLIN | EPOLLPRI)) {
LOG_DEBUG << "channel have read events, the fd = " << this->fd();
if (readCallback_) {
LOG_DEBUG << "channel call the readCallback_(), the fd = " << this->fd();
readCallback_(receiveTime);
}
}

// 写事件
if (revents_ & EPOLLOUT) {
if (writeCallback_) {
writeCallback_();
}
}

写在后面

冲!