WebServer(7) 异步日志模块

本文最后更新于:8 个月前

WebServer(7) 异步日志模块

写在前面

1 为什么要实现非阻塞的日志

  1. 如果是同步日志,那么每次产生日志信息时,就需要将这条日志信息完全写入磁盘后才会执行后续程序。而磁盘 IO 是比较耗时的操作,如果有大批量的日志信息需要写入就会阻塞网络库的工作。
  2. 如果是异步日志,那么写日志消息只需要将日志的信息先进行存储,当累计到一定量或者经过一定时间时再将这些日志信息批量写入磁盘。而这个写入过程靠后台线程去执行,不会影响处理事件的其他线程。

经过对比可以得到,异步日志的方式对性能更加友好,而且可以减少磁盘 IO 函数的操作,一次写入更多的数据,提高效率。

2 muduo日志库的设计

muduo 日志消息的组成:时间戳、线程ID、日志级别、日志正文、源文件名、行号(下面的例子省略了线程 TID)。

1
2022/11/30 00:57:016530 INFO  EventLoop start looping - EventLoop.cc:70

muduo 日志库由前端和后端组成。

  1. 前端主要包括:Logger、LogStream、FixedBuffer、SourceFile。
  2. 后端主要包括:AsyncLogging、LogFile、AppendFile。

正文

1 Logger类

Logger 类为用户提供使用日志库的接口,其内部有一个 Impl(实现类)具体实现功能。Logger 内部定义了日志的等级。muduo 的默认日志等级是 INFO。

1
2
3
4
5
6
7
8
9
enum LogLevel {
TRACE,
DEBUG,
INFO,
WARN,
ERROR,
FATAL,
NUM_LOG_LEVELS
};
  1. TRACE:指出比 DEBUG 粒度更细的一些信息事件(开发过程中使用)。
  2. DEBUG:指出细粒度信息事件对调试应用程序是非常有帮助的(开发过程中使用)。
  3. INFO:表明消息在粗粒度级别上突出强调应用程序的运行过程。
  4. WARN:系统能正常运行,但可能会出现潜在错误的情形。
  5. ERROR:指出虽然发生错误事件,但仍然不影响系统的继续运行。
  6. FATAL:指出每个严重的错误事件将会导致应用程序的退出。

调用日志库
针对不同的日志等级,本项目设置了一系列宏函数来调用。 其内部可以判断当前设置的日志等级来拦截日志输出。

1
2
3
4
5
6
7
#define LOG_DEBUG if (LogLevel() <= Logger::DEBUG) \
Logger(__FILE__, __LINE__, Logger::DEBUG, __func__).Stream()
#define LOG_INFO if (LogLevel() <= Logger::INFO) \
Logger(__FILE__, __LINE__).Stream()
#define LOG_WARN Logger(__FILE__, __LINE__, Logger::WARN).Stream()
#define LOG_ERROR Logger(__FILE__, __LINE__, Logger::ERROR).Stream()
#define LOG_FATAL Logger(__FILE__, __LINE__, Logger::FATAL).Stream()

观察宏函数,会发现每个宏定义都会构造出一个 Logger 的临时对象,然后输出相关信息。在 Logger::Impl 的构造函数中会初始化时间戳、线程ID、日志等级这类固定信息,而正文消息靠 LogStream 重载实现。在 Logger 临时对象销毁时,会调用 Logger 的析构函数,其内部会将日志信息输出到指定位置。
而 Logger 的实现文件中定义了两个全局函数指针,其执行的函数会确定日志信息的输出位置。

1
2
3
// 输出函数和刷新缓冲区函数
using OutputFunc = std::function<void(const char* msg, int len)>;
using FlushFunc = std::function<void()>;
1
2
Logger::OutputFunc g_output = DefaultOutput; // 默认输出到终端上
Logger::FlushFunc g_flush = DefaultFlush; // 默认刷新标准输出流

以下是 DefaultOutput 和 DefaultFlush 的实现。

1
2
3
4
5
6
7
static void DefaultOutput(const char* data, int len) {
fwrite(data, len, sizeof(char), stdout);
}

static void DefaultFlush() {
fflush(stdout);
}

相应的,如果我们将该函数实现为输出到某个磁盘文件上,那么就可以实现后端日志写入磁盘,这也是 muduo 日志接口部分的巧妙设计。我们仿照 muduo 定义了 SetOutput 和 SetFlush 两个函数来设置全局函数指针。

1
2
3
4
5
6
7
void Logger::SetOutput(OutputFunc out) {
g_output = out;
}

void Logger::SetFlush(FlushFunc flush) {
g_flush = flush;
}

2 FixedBuffer类

之前谈到了实现异步日志需要先将前端消息储存起来,然后到达一定数量或者一定时间再将这些信息写入磁盘。而 muduo 使用 FixedBuffer 类来实现这个存放日志信息的缓冲区。FixedBuffer 的实现在 LogStream.h 文件中。
针对不同的缓冲区,设置了两个固定容量。

1
2
const int kSmallBuffer = 4000; 		// 4KB
const int kLargeBuffer = 4000*1000; // 4MB

LogStream 类用于重载正文信息,一次信息大小是有限的,其使用的缓冲区的大小就是 kSmallBuffer。而后端日志 AsyncLogging 需要存储大量日志信息,其会使用的缓冲区大小更大,所以是 kLargeBuffer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
template <int SIZE>
class FixedBuffer : noncopyable {
public:
FixedBuffer()
: cur_(data_) {
}

void Append(const char* buf, size_t len) {
if (static_cast<size_t>(avail()) > len) {
memcpy(cur_, buf, len);
cur_ += len;
}
}

const char* Data() const { return data_; }
int Length() const { return static_cast<int>(end() - data_); }

char* Current() { return cur_; }
int Avail() const { return static_cast<int>(end() - cur_); }
void Add(size_t len) { cur_ += len; }

void Reset() { cur_ = data_; }
void Bzero() { ::bzero(data_, sizeof(data_)); }

std::string ToString() const { return std::string(data_, length()); }

private:
const char* End() const { return data_ + sizeof(data_); }

char data_[SIZE];
char* cur_;
};

3 LogStream类

LogStream 重载了一系列 operator<< 操作符,将不同格式数据转化为字符串,并存入 LogStream::buffer_。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
typedef LogStream self;
public:
...
self& operator<<(bool v)

self& operator<<(short);
self& operator<<(unsigned short);
self& operator<<(int);
self& operator<<(unsigned int);
self& operator<<(long);
self& operator<<(unsigned long);
self& operator<<(long long);
self& operator<<(unsigned long long);
self& operator<<(const void*);
self& operator<<(float v);
self& operator<<(double);
self& operator<<(char v);
self& operator<<(const char* str);
self& operator<<(const unsigned char* str);
self& operator<<(const string& v);
self& operator<<(const StringPiece& v);
self& operator<<(const Buffer& v);
...
}

4 AsyncLogging类

现在开始讲解日志库的后端设计了。前端主要实现异步日志中的日志功能,为用户提供将日志内容转换为字符串,封装为一条完整的 log 消息存放到 FixedBuffer 中;而实现异步,核心是通过专门的后端线程,与前端线程并发运行,将 FixedBuffer 中的大量日志消息写到磁盘上。

  1. AsyncLogging 提供后端线程,定时将 log 缓冲写到磁盘,维护缓冲及缓冲队列。
  2. LogFile 提供日志文件滚动功能,写文件功能。
  3. AppendFile 封装了OS 提供的基础的写文件功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class AsyncLogging {
public:
AsyncLogging(const std::string& basename,
off_t rollSize,
int flushInterval = 3);
~AsyncLogging() {
if (running_) {
Stop();
}
}

// 前端调用 Append 写入日志
void Append(const char* logling, int len);

void Start() {
running_ = true;
thread_.start();
}

void Stop() {
running_ = false;
cond_.notify_one();
thread_.join();
}

private:
// FixedBuffer缓冲区类型,而这个缓冲区大小由kLargeBuffer指定,大小为4M,因此,Buffer就是大小为4M的缓冲区类型。
using Buffer = FixedBuffer<kLargeBuffer>;
using BufferVector = std::vector<std::unique_ptr<Buffer>>;
using BufferPtr = BufferVector::value_type;

void ThreadFunc();

const int flushInterval_; //前端缓冲区定期向后端写入的时间(冲刷间隔)
std::atomic<bool> running_; //标识线程函数是否正在运行
const std::string basename_;
const off_t rollSize_;
Thread thread_;
std::mutex mutex_;
std::condition_variable cond_; //条件变量,主要用于前端缓冲区队列中没有数据时的休眠和唤醒

BufferPtr currentBuffer_; //当前缓冲区4M大小
/**
* 预备缓冲区,主要是在调用Append向当前缓冲添加日志消息时,
* 如果当前缓冲放不下,当前缓冲就会被移动到前端缓冲队列终端,
* 此时预备缓冲区用来作为新的当前缓冲
*/
BufferPtr nextBuffer_;
BufferVector buffers_; //前端缓冲区队列
};
  1. BufferPtr currentBuffer_:当前使用的缓冲区,用于储存前端日志信息,如果不够则会使用预备缓冲区。
  2. BufferPtr nextBuffer_:预备缓冲区,用于储存前端日志信息,在第一个缓冲区不够时使用
  3. BufferVector buffers_:缓冲区数组,用于储存前端日志信息,过一段时间或者缓冲区已满,就会将 Buffer 加入到 BufferVector 中。后端线程负责将 BufferVector 里的内容写入磁盘。

4.1 AsyncLogging::Append

之前说过 FixedBuffer 有不同的大小,而 4MB 的就被 AsyncLogging 所用。前端在生成一条日志消息的时候会调用AsyncLogging::Append,将日志信息加入到 AsyncLogging::Buffer 中。而前端会有不同的线程调用日志库,因此 Append 操作需要加锁保证互斥。

采用双缓冲区实现异步日志,它设置了两个 FixedBuffer<kLargeBuffer> Buffer 来储存前端的日志信息。如果当前的缓冲区不够放下日志信息,它就会将此缓冲区加入到 Buffer 数组中(为后端使用)。然后将预备缓冲区 nextBuffer 作为新的缓冲区使用。

如果后 nextBuffer 也不够使用了,那么就会新分配一个缓冲区记录日志信息,不过这种情况极少发生。如果日志写的速度很快,但是 IO 函数速度很慢,那么前端日志缓冲区就会积累,就会产生这种情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void AsyncLogging::Append(const char* logline, int len) {
// lock在构造函数中自动绑定它的互斥体并加锁,在析构函数中解锁,大大减少了死锁的风险
std::lock_guard<std::mutex> lock(mutex_);
// 缓冲区剩余空间足够则直接写入
if (currentBuffer_->Avail() > len) {
currentBuffer_->Append(logline, len);
} else {
// 当前缓冲区空间不够,将新信息写入备用缓冲区
buffers_.push_back(std::move(currentBuffer_));
if (nextBuffer_) {
currentBuffer_ = std::move(nextBuffer_);
} else {
// 备用缓冲区也不够时,重新分配缓冲区,这种情况很少见
currentBuffer_.reset(new Buffer);
}
currentBuffer_->Append(logline, len);
// 唤醒写入磁盘的后端线程
cond_.notify_one();
}
}

4.2 后端线程会如何处理前端数组

后端线程负责将 BufferVector 里的每个 Buffer 数据写到磁盘上。后端线程会生成一个 Buffer 数组 buffersToWrite,用于和前端 Buffer 数组进行交换。其每隔一段时间 flushInterval_ 就会将前端数组的内容交换到后端数组中(使用 swap)。这样可以快速归还前端数组,这样新的写满的 Buffer 就可以继续加入到前端数组中。

除此之外,后端线程还会先预备两个 Buffer,用于跟前端的 currentBuffer 和 nextBuffer 进行交换。因为为了避免等待过久,currentBuffer 还没有被填满,后端线程会定时的打断这个过程。如果到达触发时间了,当前的 Buffer 还没有写完,那么仍然会将其加入到前端的 Buffer 数组中。因此,需要将 currentBuffer 与空的 Buffer 进行交换操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void AsyncLogging::ThreadFunc() {
// output有写入磁盘的接口
LogFile output(basename_, rollSize_, false);
// 后端缓冲区,用于归还前端的缓冲区,currentBuffer nextBuffer
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
newBuffer1->Bzero();
newBuffer2->Bzero();
// 缓冲区数组置为16个,用于和前端缓冲区数组进行交换
BufferVector buffersToWrite;
buffersToWrite.reserve(16);
while (running_) {
{
// 互斥锁保护,这样别的线程在这段时间就无法向前端Buffer数组写入数据
std::unique_lock<std::mutex> lock(mutex_);
if (buffers_.empty()) {
// 等待三秒也会接触阻塞
cond_.wait_for(lock, std::chrono::seconds(3));
}

// 此时正使用的buffer也放入buffer数组中(没写完也放进去,避免等待太久才刷新一次)
buffers_.push_back(std::move(currentBuffer_));
// 归还正使用缓冲区
currentBuffer_ = std::move(newBuffer1);
// 后端缓冲区和前端缓冲区交换
buffersToWrite.swap(buffers_);
if (!nextBuffer_) {
nextBuffer_ = std::move(newBuffer2);
}
}

4.3 后端线程如何处理 buffersToWrite

首先会先判断交换得到的 buffersToWrite 数组的大小,如果其超过了 25 个,那么我们只会保留前两个 Buffer。一个 Buffer 是 4MB,25 个说明前端已经堆积了 100 MB 大小的日志信息。
假设磁盘的写速度 100MB/S,要堆积 100MB 有 2 种极端情况:

  1. 1S内产生200MB数据;
  2. 25秒内,平均每秒产生104MB数据;

不论哪种情况,都是要超过磁盘的处理速度。而实际应用中,只有产生数据速度不到磁盘写速度的 1/10,应用程序性能才不会受到明显影响。

紧接着,后端线程会遍历各个 Buffer,然后调用 output 将数据输出到指定位置。

1
2
3
4
// 遍历所有 buffer,将其写入文件
for (const auto& buffer : buffersToWrite) {
output.append(buffer->Data(), buffer->Length());
}

写入完成后,将 buffersToWrite 的大小 resize 成两个,并归还 nextBuffer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    // 只保留两个缓冲区
if (buffersToWrite.size() > 2) {
buffersToWrite.resize(2);
}

// 归还newBuffer1缓冲区
if (!newBuffer1) {
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->Reset();
}

// 归还newBuffer2缓冲区
if (!newBuffer2) {
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->Reset();
}

buffersToWrite.clear(); // 清空后端缓冲区队列
output.Flush(); //清空文件缓冲区
}
output.Flush();
}

5 LogFile类

LogFile 主要职责:提供对日志文件的操作,包括滚动日志文件、将 log 数据写到当前 log 文件、flush log数据到当前 log 文件。

5.1 写日志文件操作

LogFile提供了2个接口,用于向当前日志文件file_写入数据。Append本质上是通过AppendInLock完成对日志文件写操作,但多了线程安全。用户只需调用第一个接口即可,append会根据线程安全需求,自行判断是否需要加上;第二个是private接口。

1
2
void Append(const char* data, int len);
void AppendInLock(const char* data, int len);

AppendInLock 会先将log消息写入file_文件,之后再判断是否需要滚动日志文件;如果不滚动,就根据Append_unlocked的调用次数和时间,确保:

  1. 一个log文件超时(默认1天),就创建一个新的;
  2. flush文件操作,不会频繁执行(默认间隔3秒)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void LogFile::AppendInLock(const char* data, int len) {
file_->Append(data, len);

if (file_->WrittenBytes() > rollSize_) {
RollFile();
} else {
++count_;
if (count_ >= checkEveryN_) {
count_ = 0;
time_t now = ::time(NULL);
time_t thisPeriod = now / kRollPerSeconds_ * kRollPerSeconds_;
if (thisPeriod != startOfPeriod_) {
RollFile();
} else if (now - lastFlush_ > flushInterval_) {
lastFlush_ = now;
file_->Flush();
}
}
}
}

5.2 滚动日志

当文件写入量超过一个大小,就会创建一个新的日志文件,向里面写入数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool LogFile::RollFile() {
time_t now = 0;
std::string filename = GetLogFileName(basename_, &now);
// 计算现在是第几天 now/kRollPerSeconds求出现在是第几天,再乘以秒数相当于是当前天数0点对应的秒数
time_t start = now / kRollPerSeconds_ * kRollPerSeconds_;

if (now > lastRoll_) {
lastRoll_ = now;
lastFlush_ = now;
startOfPeriod_ = start;
// 让file_指向一个名为filename的文件,相当于新建了一个文件
file_.reset(new FileUtil(filename));
return true;
}
return false;
}

5.3 flush 日志文件

flush操作往往与write文件操作配套。LogFile::Flush实际上是通过AppendFile::Flush(),完成对日志文件的冲刷。与LogFile::Append()类似,flush也能通过mutex_指针是否为空,自动选择线程安全版本,还是非线程安全版本。

1
2
3
4
void LogFile::Flush() {
// std::lock_guard<std::mutex> lock(*mutex_);
file_->Flush();
}

6 AppendFile类

AppendFile 在 FileUtil.cc 文件中被实现,其封装了 FILE 对文件操作的方法。以组合的形式被 LogFile 使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class FileUtil {
public:
explicit FileUtil(std::string& fileName);
~FileUtil();

void Append(const char* data, size_t len); // 添加log消息到文件末尾

void Flush(); // 冲刷文件到磁盘

off_t WrittenBytes() const { return writtenBytes_; } // 返回已写字节数

private:
size_t Write(const char* data, size_t len); // 写数据到文件

FILE* fp_; // 文件指针
char buffer_[64 * 1024]; // fp_的缓冲区
off_t writtenBytes_; // off_t用于指示文件的偏移量
};

使用 RALL 手法,初始化即构造对象,析构则释放文件资源。setBuffer 可以设置系统默认的 IO 缓冲区为自己指定的用户缓冲区。大小为 64 KB。

1
2
3
4
5
6
7
8
9
10
FileUtil::FileUtil(std::string& fileName)
: fp_(::fopen(fileName.c_str(), "ae")),
writtenBytes_(0) {
// 将fd_缓冲区设置为本地的buffer_
::setbuffer(fp_, buffer_, sizeof(buffer_));
}

FileUtil::~FileUtil() {
::fclose(fp_);
}

AppendFile 有个两个接口:Append 和 Write。其中,Append() 是供用户调用的 public 接口,确保将指定数据附加到文件末尾,实际的写文件操作是通过 Write() 来完成的;Write 通过非线程安全的 glibc 库函数fwrite_unlocked() 来完成写文件操作,而没有选择线程安全的 fwrite(),主要是出于性能考虑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void FileUtil::Append(const char* data, size_t len) {
// 记录已经写入的数据大小
size_t written = 0;

while (written != len) {
// 还需写入的数据大小
size_t remain = len - written;
size_t n = Write(data + written, remain);
if (n != remain) {
int err = ferror(fp_);
if (err) {
fprintf(stderr, "FileUtil::append() failed %s\n", GetErrnoMsg(err));
}
}
// 更新写入的数据大小
written += n;
}
// 记录目前为止写入的数据大小,超过限制会滚动日志
writtenBytes_ += written;
}
1
2
3
4
5
6
7
8
9
10
size_t FileUtil::Write(const char* data, size_t len) {
/**
* size_t fwrite(const void* buffer, size_t size, size_t count, FILE* stream);
* -- buffer:指向数据块的指针
* -- size:每个数据的大小,单位为Byte(例如:sizeof(int)就是4)
* -- count:数据个数
* -- stream:文件指针
*/
return ::fwrite_unlocked(data, 1, len, fp_);
}