WebServer(5) Buffer类

本文最后更新于:8 个月前

WebServer(5) Buffer类

写在前面-为什么要有缓冲区的设计

TcpConnection 类负责处理一个新连接的事件,包括从客户端读取数据和向客户端写数据。但是在这之前,需要先设计好缓冲区。

  1. 非阻塞网络编程中应用层buffer是必须的:非阻塞IO的核心思想是避免阻塞在read()write()或其他I/O系统调用上,这样可以最大限度复用thread-of-control,让一个线程能服务于多个socket连接。I/O线程只能阻塞在IO-multiplexing函数上,如select()/poll()/epoll_wait()。这样一来,应用层的缓冲是必须的,每个TCP socket都要有inputBufferoutputBuffer
  2. TcpConnection必须有output buffer:使程序在write()操作上不会产生阻塞,当write()操作后,操作系统一次性没有接受完时,网络库把剩余数据则放入outputBuffer中,然后注册POLLOUT事件,一旦socket变得可写,则立刻调用write()进行写入数据。——应用层buffer到操作系统buffer
  3. TcpConnection必须有input buffer:当发送方send数据后,接收方收到数据不一定是整个的数据,网络库在处理socket可读事件的时候,必须一次性把socket里的数据读完,否则会反复触发POLLIN事件,造成busy-loop。所以网路库为了应对数据不完整的情况,收到的数据先放到inputBuffer里。——操作系统buffer到应用层buffer

正文

1 Buffer缓冲区设计

muduo 的 Buffer 类作为网络通信的缓冲区,像是 TcpConnection 就拥有 inputBuffer 和 outputBuffer 两个缓冲区成员。而缓冲区的设计特点:

  1. 其内部使用std::vector<char>保存数据,并提供许多访问方法。并且std::vector拥有扩容空间的操作,可以适应数据的不断添加。
  2. std::vector<char>内部分为三块,头部预留空间,可读空间,可写空间。内部使用索引标注每个空间的起始位置。每次往里面写入数据,就移动writeIndex;从里面读取数据,就移动readIndex

2 Buffer基本成员

图为Buffer内部逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Buffer : public muduo::copyable {
public:
static const size_t kCheapPrepend = 8; // 头部预留8个字节
static const size_t kInitialSize = 1024; // 缓冲区初始化大小 1KB

explicit Buffer(size_t initialSize = kInitialSize)
: buffer_(kCheapPrepend + initialSize), // buffer分配大小 8 + 1KB
readerIndex_(kCheapPrepend), // 可读索引和可写索引最开始位置都在预留字节后
writerIndex_(kCheapPrepend) {
assert(readableBytes() == 0);
assert(writableBytes() == initialSize);
assert(prependableBytes() == kCheapPrepend);
}

/*......*/

// 可读空间大小
size_t readableBytes() const { return writerIndex_ - readerIndex_; }

// 可写空间大小
size_t writableBytes() const { return buffer_.size() - writerIndex_; }

// 预留空间大小
size_t prependableBytes() const { return readerIndex_; }

// 返回可读空间地址
const char* peek() const { return begin() + readerIndex_; }

/*......*/

private:
std::vector<char> buffer_; // 缓冲区其实就是vector<char>
size_t readerIndex_; // 可读区域开始索引
size_t writerIndex_; // 可写区域开始索引
};

3 读写数据时对Buffer的操作

3.1 向Buffer写入数据:ReadFd()

ssize_t Buffer::ReadFd(int fd, int* savedErrno):表示从 fd 中读取数据到 buffer_ 中。对于 buffer 来说这是写入数据的操作,会改变writeIndex

  1. 考虑到 buffer_ 的 writableBytes 空间大小,不能够一次性读完数据,于是内部还在栈上创建了一个临时缓冲区 char extrabuf[65536];。如果有多余的数据,就将其读入到临时缓冲区中。
  2. 因为可能要写入两个缓冲区,所以使用了更加高效readv函数,可以向多个地址写入数据。刚开始会判断需要写入的大小。
    1. 如果一个缓冲区足够,就不必再往临时缓冲区extrabuf写入数据了。写入后需要更新writeIndex位置,writerIndex_ += n;
    2. 如果一个缓冲区不够,则还需往临时缓冲区extrabuf写入数据。原缓冲区直接写满,writeIndex_ = buffer_.size()。然后往临时缓冲区写入数据,Append(extrabuf, n - writable);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
ssize_t Buffer::ReadFd(int fd, int *saveErrno) {
// 栈额外空间,用于从套接字往buffer_读时,当buffer_暂时不够用时暂存数据,待buffer_重新分配足够空间后,在把数据交换给buffer_。
char extrabuf[65536] = {0}; // 栈上内存空间 65536/1024 = 64KB

/*
struct iovec {
ptr_t iov_base; // iov_base指向的缓冲区存放的是readv所接收的数据或是writev将要发送的数据
size_t iov_len; // iov_len在各种情况下分别确定了接收的最大长度以及实际写入的长度
};
*/

// 使用iovec分配两个连续的缓冲区
struct iovec vec[2];
const size_t writable = WritableBytes(); // 这是Buffer底层缓冲区剩余的可写空间大小 不一定能完全存储从fd读出的数据

// 第一块缓冲区,指向可写空间
vec[0].iov_base = Begin() + writerIndex_;
vec[0].iov_len = writable;

// 第二块缓冲区,指向栈空间
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof(extrabuf);

// 这里之所以说最多128k-1字节,是因为若writable为64k-1,那么需要两个缓冲区 第一个64k-1 第二个64k 所以最多128k-1
// 如果第一个缓冲区>=64k 那就只采用一个缓冲区 而不使用栈空间extrabuf[65536]的内容
const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1;
const ssize_t n = ::readv(fd, vec, iovcnt);

if (n < 0) {
*saveErrno = errno;
} else if (n <= writable) {
// Buffer的可写缓冲区已经够存储读出来的数据了
writerIndex_ += n;
} else {
// extrabuf里面也写入了n-writable长度的数据
writerIndex_ = buffer_.size();
Append(extrabuf, n - writable); // 对buffer_扩容 并将extrabuf存储的另一部分数据追加至buffer_
}
return n;
}

其中的 Append 函数真正向 buffer_ 内部添加数据。调用方将数据的首地址和长度给出,其内部将数据拷贝到指定位置。

1
2
3
4
5
6
7
8
// 把[data, data+len]内存上的数据添加到缓冲区中
void Append(const char *data, size_t len) {
// 确保可写空间足够
EnsureWritableBytes(len);
// 将这段数据拷贝到可写位置之后
std::copy(data, data+len, BeginWrite());
writerIndex_ += len;
}

3.2 空间不够怎么办?

如果写入空间不够,Buffer 内部会有两个方案来应付

  1. 将数据往前移动:因为每次读取数据,readIndex索引都会往后移动,从而导致前面预留的空间逐渐增大。我们需要将后面的元素重新移动到前面。
  2. 如果第一种方案的空间仍然不够,那么我们就直接对 buffer_ 进行扩容(buffer_.resize(len))操作。

如图所示:现在的写入空间不够,但是前面的预留空间加上现在的写空间是足够的。因此,我们需要将后面的数据拷贝到前面,腾出足够的写入空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void EnsureWritableBytes(size_t len) {
if (WritableBytes() < len) {
// 扩容函数
MakeSpace(len);
}
}

// 扩容空间
void MakeSpace(int len) {
/**
* kCheapPrepend | reader | writer |
* kCheapPrepend | len |
*/
if (WritableBytes() + PrependableBytes() < len + kCheapPrepend) {
// 整个buffer都不够用
buffer_.resize(writerIndex_ + len);
} else {
// 整个buffer够用,将后面移动到前面继续分配
size_t readable = ReadableBytes();
std::copy(Begin() + readerIndex_,
Begin() + writerIndex_,
Begin() + kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
}
}

3.3 从Buffer中读取数据

就如回声服务器的例子一样:

1
2
3
4
5
6
7
8
// 可读写事件回调
void OnMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) {
std::string msg = buf->RetrieveAllAsString();
LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, "
<< "data received at " << time.toFormattedString();
conn->send(msg);
// conn->shutdown(); // 关闭写端 底层响应EPOLLHUP => 执行closeCallback_
}

读取数据会调用void Retrieve(size_t len)函数,在这之前会判断读取长度是否大于可读取空间

  1. 如果小于,则直接后移readIndex即可,readerIndex_ += len;
  2. 如果大于等于,说明全部数据都读取出来。此时会将buffer置为初始状态:
    1. readerIndex_ = kCheapPrepend;
    2. writerIndex_ = kCheapPrepend;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 将onMessage函数上报的Buffer数据,转成string类型的数据返回
std::string RetrieveAllAsString() {
// 应用可读取数据的长度
return RetrieveAsString(ReadableBytes());
}

std::string RetrieveAsString(size_t len) {
// peek()可读数据的起始地址
std::string result(Peek(), len);
// 上面一句把缓冲区中可读取的数据读取出来,所以要将缓冲区复位
Retrieve(len);
return result;
}

// onMessage string <- Buffer
// 需要进行复位操作
void Retrieve(size_t len) {
// 应用只读取可读缓冲区数据的一部分(读取了len的长度)
if (len < ReadableBytes()) {
// 移动可读缓冲区指针
readerIndex_ += len;
} else {
// 全部读完 len == readableBytes()
RetrieveAll();
}
}

// 全部读完,则直接将可读缓冲区指针移动到写缓冲区指针那
void RetrieveAll() {
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}

4 TcpConnection使用Buffer

TcpConnection 拥有 inputBuffer 和 outputBuffer 两个缓冲区成员。

  1. 当服务端接收客户端数据,EventLoop 返回活跃的 Channel,并调用对应的读事件处理函数,即 TcpConnection 调用 handleRead 方法从相应的 fd 中读取数据到 inputBuffer 中。在 Buffer 内部 inputBuffer 中的 writeIndex 向后移动。
  2. 当服务端向客户端发送数据,TcpConnection 调用 handleWrite 方法将 outputBuffer 的数据写入到 TCP 发送缓冲区。outputBuffer 内部调用 Retrieve 方法移动 readIndex 索引。

4.1 TcpConnection接收客户端数据(从客户端sock读取数据到inputBuffer)

调用inputBuffer_.ReadFd(channel_->fd(), &savedErrno);将对端fd数据读取到inputBuffer中。

  1. 如果读取成功,调用「可读事件发生回调函数」
  2. 如果读取数据长度为0,说明对端关闭连接。调用HandleCose()
  3. 出错,则保存errno,调用HandleError()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void TcpConnection::HandleRead(Timestamp receiveTime) {
int savedErrno = 0;
// TcpConnection会从socket读取数据,然后写入inpuBuffer
ssize_t n = inputBuffer_.ReadFd(channel_->fd(), &savedErrno);
if (n > 0) {
// 已建立连接的用户,有可读事件发生,调用用户传入的回调操作
// TODO:shared_from_this
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
} else if (n == 0) {
// 没有数据,说明客户端关闭连接
HandleClose();
} else {
// 出错情况
errno = savedErrno;
LOG_ERROR << "TcpConnection::handleRead() failed";
HandleError();
}
}

4.2 TcpConnection向客户端发送数据(将ouputBuffer数据输出到socket中)

1
2
// 此行代码的用意何在
if (channel_->IsWriting())
  1. 要在channel_确实关注写事件的前提下正常发送数据:因为一般有一个send函数发送数据,如果TCP接收缓冲区不够接收ouputBuffer的数据,就需要多次写入。需要重新注册写事件,因此是在注册了写事件的情况下调用的HandleWrite
  2. channel->fd()发送outputBuffer中的可读取数据。成功发送数据则移动readIndex,并且如果一次性成功写完数据,就不再让此channel关注写事件了,并调用写事件完成回调函数没写完则继续关注!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void TcpConnection::HandleWrite() {
if (channel_->IsWriting()) {
int saveErrno = 0;
ssize_t n = outputBuffer_.WriteFd(channel_->fd(), &saveErrno);
// 正确读取数据
if (n > 0) {
outputBuffer_.Retrieve(n);
// 说明buffer可读数据都被TcpConnection读取完毕并写入给了客户端
// 此时就可以关闭连接,否则还需继续提醒写事件
if (outputBuffer_.ReadableBytes() == 0) {
channel_->DisableWriting();
// 调用用户自定义的写完数据处理函数
if (writeCompleteCallback_) {
// 唤醒loop_对应的thread线程,执行写完成事件回调
loop_->QueueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting) {
ShutdownInLoop();
}
}
} else {
LOG_ERROR << "TcpConnection::handleWrite() failed";
}
} else {
// state_不为写状态
LOG_ERROR << "TcpConnection fd=" << channel_->fd() << " is down, no more writing";
}
}