本文最后更新于:8 个月前
WebServer(5) Buffer类
写在前面-为什么要有缓冲区的设计
TcpConnection 类负责处理一个新连接的事件,包括从客户端读取数据和向客户端写数据。但是在这之前,需要先设计好缓冲区。
- 非阻塞网络编程中应用层buffer是必须的:非阻塞IO的核心思想是避免阻塞在
read()
或write()
或其他I/O
系统调用上,这样可以最大限度复用thread-of-control
,让一个线程能服务于多个socket
连接。I/O
线程只能阻塞在IO-multiplexing
函数上,如select()/poll()/epoll_wait()
。这样一来,应用层的缓冲是必须的,每个TCP socket
都要有inputBuffer
和outputBuffer
。
- TcpConnection必须有output buffer:使程序在
write()
操作上不会产生阻塞,当write()
操作后,操作系统一次性没有接受完时,网络库把剩余数据则放入outputBuffer
中,然后注册POLLOUT
事件,一旦socket
变得可写,则立刻调用write()
进行写入数据。——应用层buffer
到操作系统buffer
- TcpConnection必须有input buffer:当发送方
send
数据后,接收方收到数据不一定是整个的数据,网络库在处理socket
可读事件的时候,必须一次性把socket
里的数据读完,否则会反复触发POLLIN
事件,造成busy-loop
。所以网路库为了应对数据不完整的情况,收到的数据先放到inputBuffer
里。——操作系统buffer
到应用层buffer
。
正文
1 Buffer缓冲区设计
muduo 的 Buffer 类作为网络通信的缓冲区,像是 TcpConnection 就拥有 inputBuffer 和 outputBuffer 两个缓冲区成员。而缓冲区的设计特点:
- 其内部使用
std::vector<char>
保存数据,并提供许多访问方法。并且std::vector
拥有扩容空间的操作,可以适应数据的不断添加。
std::vector<char>
内部分为三块,头部预留空间,可读空间,可写空间。内部使用索引标注每个空间的起始位置。每次往里面写入数据,就移动writeIndex
;从里面读取数据,就移动readIndex
。
2 Buffer基本成员
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| class Buffer : public muduo::copyable { public: static const size_t kCheapPrepend = 8; static const size_t kInitialSize = 1024;
explicit Buffer(size_t initialSize = kInitialSize) : buffer_(kCheapPrepend + initialSize), // buffer分配大小 8 + 1KB readerIndex_(kCheapPrepend), // 可读索引和可写索引最开始位置都在预留字节后 writerIndex_(kCheapPrepend) { assert(readableBytes() == 0); assert(writableBytes() == initialSize); assert(prependableBytes() == kCheapPrepend); }
size_t readableBytes() const { return writerIndex_ - readerIndex_; } size_t writableBytes() const { return buffer_.size() - writerIndex_; } size_t prependableBytes() const { return readerIndex_; } const char* peek() const { return begin() + readerIndex_; }
private: std::vector<char> buffer_; size_t readerIndex_; size_t writerIndex_; };
|
3 读写数据时对Buffer的操作
3.1 向Buffer写入数据:ReadFd()
ssize_t Buffer::ReadFd(int fd, int* savedErrno)
:表示从 fd 中读取数据到 buffer_ 中。对于 buffer 来说这是写入数据的操作,会改变writeIndex
。
- 考虑到 buffer_ 的 writableBytes 空间大小,不能够一次性读完数据,于是内部还在栈上创建了一个临时缓冲区
char extrabuf[65536];
。如果有多余的数据,就将其读入到临时缓冲区中。
- 因为可能要写入两个缓冲区,所以使用了更加高效
readv
函数,可以向多个地址写入数据。刚开始会判断需要写入的大小。
- 如果一个缓冲区足够,就不必再往临时缓冲区
extrabuf
写入数据了。写入后需要更新writeIndex
位置,writerIndex_ += n;
。
- 如果一个缓冲区不够,则还需往临时缓冲区
extrabuf
写入数据。原缓冲区直接写满,writeIndex_ = buffer_.size()
。然后往临时缓冲区写入数据,Append(extrabuf, n - writable);
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| ssize_t Buffer::ReadFd(int fd, int *saveErrno) { char extrabuf[65536] = {0};
struct iovec vec[2]; const size_t writable = WritableBytes();
vec[0].iov_base = Begin() + writerIndex_; vec[0].iov_len = writable;
vec[1].iov_base = extrabuf; vec[1].iov_len = sizeof(extrabuf);
const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1; const ssize_t n = ::readv(fd, vec, iovcnt);
if (n < 0) { *saveErrno = errno; } else if (n <= writable) { writerIndex_ += n; } else { writerIndex_ = buffer_.size(); Append(extrabuf, n - writable); } return n; }
|
其中的 Append 函数真正向 buffer_ 内部添加数据。调用方将数据的首地址和长度给出,其内部将数据拷贝到指定位置。
1 2 3 4 5 6 7 8
| void Append(const char *data, size_t len) { EnsureWritableBytes(len); std::copy(data, data+len, BeginWrite()); writerIndex_ += len; }
|
3.2 空间不够怎么办?
如果写入空间不够,Buffer 内部会有两个方案来应付
- 将数据往前移动:因为每次读取数据,
readIndex
索引都会往后移动,从而导致前面预留的空间逐渐增大。我们需要将后面的元素重新移动到前面。
- 如果第一种方案的空间仍然不够,那么我们就直接对 buffer_ 进行扩容(
buffer_.resize(len)
)操作。
如图所示:现在的写入空间不够,但是前面的预留空间加上现在的写空间是足够的。因此,我们需要将后面的数据拷贝到前面,腾出足够的写入空间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| void EnsureWritableBytes(size_t len) { if (WritableBytes() < len) { MakeSpace(len); } }
void MakeSpace(int len) {
if (WritableBytes() + PrependableBytes() < len + kCheapPrepend) { buffer_.resize(writerIndex_ + len); } else { size_t readable = ReadableBytes(); std::copy(Begin() + readerIndex_, Begin() + writerIndex_, Begin() + kCheapPrepend); readerIndex_ = kCheapPrepend; writerIndex_ = readerIndex_ + readable; } }
|
3.3 从Buffer中读取数据
就如回声服务器的例子一样:
1 2 3 4 5 6 7 8
| void OnMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) { std::string msg = buf->RetrieveAllAsString(); LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, " << "data received at " << time.toFormattedString(); conn->send(msg); }
|
读取数据会调用void Retrieve(size_t len)
函数,在这之前会判断读取长度是否大于可读取空间
- 如果小于,则直接后移
readIndex
即可,readerIndex_ += len;
。
- 如果大于等于,说明全部数据都读取出来。此时会将buffer置为初始状态:
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| std::string RetrieveAllAsString() { return RetrieveAsString(ReadableBytes()); }
std::string RetrieveAsString(size_t len) { std::string result(Peek(), len); Retrieve(len); return result; }
void Retrieve(size_t len) { if (len < ReadableBytes()) { readerIndex_ += len; } else { RetrieveAll(); } }
void RetrieveAll() { readerIndex_ = kCheapPrepend; writerIndex_ = kCheapPrepend; }
|
4 TcpConnection使用Buffer
TcpConnection 拥有 inputBuffer 和 outputBuffer 两个缓冲区成员。
- 当服务端接收客户端数据,EventLoop 返回活跃的 Channel,并调用对应的读事件处理函数,即 TcpConnection 调用 handleRead 方法从相应的 fd 中读取数据到 inputBuffer 中。在 Buffer 内部 inputBuffer 中的 writeIndex 向后移动。
- 当服务端向客户端发送数据,TcpConnection 调用 handleWrite 方法将 outputBuffer 的数据写入到 TCP 发送缓冲区。outputBuffer 内部调用
Retrieve
方法移动 readIndex 索引。
调用inputBuffer_.ReadFd(channel_->fd(), &savedErrno);
将对端fd
数据读取到inputBuffer
中。
- 如果读取成功,调用「可读事件发生回调函数」
- 如果读取数据长度为
0
,说明对端关闭连接。调用HandleCose()
- 出错,则保存
errno
,调用HandleError()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void TcpConnection::HandleRead(Timestamp receiveTime) { int savedErrno = 0; ssize_t n = inputBuffer_.ReadFd(channel_->fd(), &savedErrno); if (n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { HandleClose(); } else { errno = savedErrno; LOG_ERROR << "TcpConnection::handleRead() failed"; HandleError(); } }
|
4.2 TcpConnection向客户端发送数据(将ouputBuffer数据输出到socket中)
1 2
| if (channel_->IsWriting())
|
- 要在
channel_
确实关注写事件的前提下正常发送数据:因为一般有一个send
函数发送数据,如果TCP接收缓冲区不够接收ouputBuffer的数据,就需要多次写入。需要重新注册写事件,因此是在注册了写事件的情况下调用的HandleWrite
。
- 向
channel->fd()
发送outputBuffer中的可读取数据。成功发送数据则移动readIndex
,并且如果一次性成功写完数据,就不再让此channel
关注写事件了,并调用写事件完成回调函数没写完则继续关注!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| void TcpConnection::HandleWrite() { if (channel_->IsWriting()) { int saveErrno = 0; ssize_t n = outputBuffer_.WriteFd(channel_->fd(), &saveErrno); if (n > 0) { outputBuffer_.Retrieve(n); if (outputBuffer_.ReadableBytes() == 0) { channel_->DisableWriting(); if (writeCompleteCallback_) { loop_->QueueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); } if (state_ == kDisconnecting) { ShutdownInLoop(); } } } else { LOG_ERROR << "TcpConnection::handleWrite() failed"; } } else { LOG_ERROR << "TcpConnection fd=" << channel_->fd() << " is down, no more writing"; } }
|