第 14 课:任务队列 — TaskQueue、SerialTaskQueue、ConcurrentTaskQueue#
对应源文件:
trantor/utils/TaskQueue.h — 抽象基类trantor/utils/SerialTaskQueue.h / SerialTaskQueue.cc — 串行执行队列trantor/utils/ConcurrentTaskQueue.h / ConcurrentTaskQueue.cc — 并发线程池队列
一、为什么需要 TaskQueue?#
EventLoop 是单线程的,其中不能执行任何阻塞操作(数据库查询、文件 I/O、耗时计算),否则整条链路的 I/O 响应都会被拖慢。
TaskQueue 提供了一个"卸载阻塞任务"的机制:
1
2
3
4
5
6
| [EventLoop 线程] [TaskQueue 线程]
收到玩家请求 │
→ 投递到 TaskQueue ────────────►│ 执行 DB 查询(可阻塞)
→ 立即返回,处理下一个事件 │ 查询完成
│ → 回调投递回 EventLoop
◄── 收到结果,发送响应 ────────────┘
|
这是异步编程的基本模式:不阻塞事件循环,把耗时操作委托给专用线程。
二、TaskQueue — 抽象基类#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| class TaskQueue : public NonCopyable {
public:
// 纯虚:子类实现具体的投递方式
virtual void runTaskInQueue(const std::function<void()> &task) = 0;
virtual void runTaskInQueue(std::function<void()> &&task) = 0;
virtual std::string getName() const { return ""; }
// 同步执行:投递任务并阻塞等待完成(基类实现,子类免费获得)
void syncTaskInQueue(const std::function<void()> &task) {
std::promise<int> prom;
std::future<int> fut = prom.get_future();
runTaskInQueue([&]() {
task();
prom.set_value(1); // 任务完成,解锁调用方
});
fut.get(); // 阻塞等待
}
};
|
syncTaskInQueue 的精妙之处:
基类只用 promise/future 就实现了"同步等待"语义,子类无需知道任何细节。任何继承 TaskQueue 的类都自动获得这个能力。
⚠️ 使用 syncTaskInQueue 的危险:如果从 TaskQueue 的执行线程里调用 syncTaskInQueue(向自己投递任务),会死锁——执行线程阻塞等待自己处理完投递的任务,而任务在等自己释放执行权。
三、SerialTaskQueue — 串行任务队列#
3.1 实现极简:复用 EventLoopThread#
1
2
3
4
5
6
| class SerialTaskQueue : public TaskQueue {
protected:
std::string queueName_;
EventLoopThread loopThread_; // 一个 EventLoopThread = 一个独立线程 + EventLoop
bool stop_{false};
};
|
整个 SerialTaskQueue 的核心只有这三行实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| SerialTaskQueue::SerialTaskQueue(const std::string &name)
: queueName_(name.empty() ? "SerialTaskQueue" : name),
loopThread_(queueName_) // 启动独立线程,EventLoop 在那里
{
loopThread_.run(); // 立刻开始 loop()
}
void SerialTaskQueue::runTaskInQueue(const std::function<void()> &task)
{
loopThread_.getLoop()->runInLoop(task); // 投递到 EventLoop
}
void SerialTaskQueue::runTaskInQueue(std::function<void()> &&task)
{
loopThread_.getLoop()->runInLoop(std::move(task));
}
|
设计亮点:站在巨人肩膀上
SerialTaskQueue 没有自己实现任何队列、锁、线程管理——全部复用 EventLoopThread。
EventLoop::runInLoop() 已经保证了:
- 线程安全(跨线程投递用
queueInLoop) - 串行执行(单线程 EventLoop,任务一个接一个执行)
- 任务队列管理(
MpscQueue<Func> funcs_)
SerialTaskQueue 只是给 EventLoopThread 套了一个 TaskQueue 接口,让它符合"任务队列"的使用规范。
3.2 串行保证#
由于底层是单线程 EventLoop,所有投递到 SerialTaskQueue 的任务严格按投递顺序执行,且不会并发。这天然解决了数据竞争问题。
1
2
| 投递顺序:task1 → task2 → task3
执行顺序:task1(完成)→ task2(完成)→ task3(完成)
|
3.3 waitAllTasksFinished()#
1
2
3
4
5
| void SerialTaskQueue::waitAllTasksFinished() {
syncTaskInQueue([]() {
// 空任务
});
}
|
这是一个非常聪明的技巧:投递一个空任务,等它执行完。
由于串行执行,空任务执行完意味着它之前所有已投递的任务也都执行完了。syncTaskInQueue 阻塞等待这个空任务,等到它返回,队列就"清空"了。
3.4 isRunningTask()#
1
2
3
4
5
| bool isRunningTask() {
return loopThread_.getLoop()
? loopThread_.getLoop()->isCallingFunctions()
: false;
}
|
isCallingFunctions() 是 EventLoop 的方法,返回当前是否正在执行 doRunInLoopFuncs()(即是否有任务在运行)。可以用来判断队列是否"忙"。
3.5 析构与停止#
1
2
3
4
5
6
7
8
9
| void SerialTaskQueue::stop() {
stop_ = true;
loopThread_.getLoop()->quit(); // 通知 EventLoop 退出
loopThread_.wait(); // 等线程结束
}
SerialTaskQueue::~SerialTaskQueue() {
if (!stop_) stop();
}
|
析构时自动停止,stop_ 标志防止重复调用。
四、ConcurrentTaskQueue — 并发线程池队列#
4.1 数据结构#
1
2
3
4
5
6
7
8
9
10
11
12
| class ConcurrentTaskQueue : public TaskQueue {
private:
size_t queueCount_;
std::string queueName_;
std::queue<std::function<void()>> taskQueue_; // 共享任务队列
std::vector<std::thread> threads_; // 工作线程池
std::mutex taskMutex_; // 保护 taskQueue_
std::condition_variable taskCond_; // 任务到来时唤醒线程
std::atomic_bool stop_;
};
|
经典的生产者-消费者模型:
- 生产者:调用
runTaskInQueue() 的线程(可以是 EventLoop 线程) - 消费者:
threads_ 中的工作线程
4.2 投递任务#
1
2
3
4
5
6
7
| void ConcurrentTaskQueue::runTaskInQueue(std::function<void()> &&task) {
{
std::lock_guard<std::mutex> lock(taskMutex_);
taskQueue_.push(std::move(task)); // 入队
}
taskCond_.notify_one(); // 唤醒一个等待的工作线程
}
|
notify_one():只唤醒一个线程,避免"惊群效应"(一个任务来了,所有线程都被唤醒,但只有一个能拿到任务,其余继续睡眠,白白上下文切换)。
4.3 工作线程循环 queueFunc()#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| void ConcurrentTaskQueue::queueFunc(int queueNum)
{
char tmpName[32];
snprintf(tmpName, sizeof(tmpName), "%s%d", queueName_.c_str(), queueNum);
#ifdef __linux__
::prctl(PR_SET_NAME, tmpName); // 设置线程名,便于 top/htop 观察
#endif
while (!stop_)
{
std::function<void()> r;
{
std::unique_lock<std::mutex> lock(taskMutex_);
while (!stop_ && taskQueue_.empty())
{
taskCond_.wait(lock); // 释放锁,睡眠等待任务
}
if (!taskQueue_.empty()) {
r = std::move(taskQueue_.front()); // 取出任务(move,不拷贝)
taskQueue_.pop();
} else {
continue; // stop_ 被设置,队列空,退出循环
}
} // 释放锁
r(); // 执行任务(在锁外面!)
}
}
|
关键细节:任务在锁外执行
1
2
| } // ← 锁在这里释放
r(); // ← 任务在锁外执行
|
如果在持有 taskMutex_ 时执行 r(),那么:
r() 期间不能再投递新任务(runTaskInQueue 会死锁)- 其他工作线程不能取新任务
所以把任务提取出来(move),释放锁,再执行。这是生产者-消费者模式的标准写法。
while (!stop_ && taskQueue_.empty()) 而不是 if
condition_variable::wait() 可能发生虚假唤醒(spurious wakeup)——没有 notify 也会醒来。用 while 循环而不是 if 确保只有真正有任务时才继续,是使用 condition_variable 的标准做法。
4.4 停止与析构#
1
2
3
4
5
6
7
8
9
| void ConcurrentTaskQueue::stop() {
if (!stop_) {
stop_ = true;
taskCond_.notify_all(); // 唤醒所有等待线程,让它们检查 stop_ 退出
for (auto &t : threads_)
t.join();
}
}
ConcurrentTaskQueue::~ConcurrentTaskQueue() { stop(); }
|
notify_all() 在停止时必须唤醒所有线程,否则正在 wait(lock) 的线程永远不会检查 stop_,join() 会死锁。
五、两种队列的本质区别#
| SerialTaskQueue | ConcurrentTaskQueue |
|---|
| 底层实现 | EventLoopThread(复用 EventLoop) | 传统线程池 + mutex + condition_variable |
| 并发度 | 严格串行(1 个线程) | N 个线程并发 |
| 任务顺序 | 保证顺序 | 不保证顺序 |
| 共享数据 | 不需要额外保护(单线程) | 需要自己加锁 |
| 定时器支持 | 有(EventLoop::runAfter) | 无 |
| 唤醒机制 | eventfd(EventLoop 内建) | condition_variable |
| 线程名 | 由 EventLoopThread 设置 | prctl 设置(name + 数字) |
选择建议#
用 SerialTaskQueue 当:
- 需要保证操作顺序(如同一个玩家的数据库写操作必须串行)
- 操作共享状态(不加锁的前提是单线程访问)
- 偶尔需要定时任务
用 ConcurrentTaskQueue 当:
- 任务彼此独立,可以并行(如批量图片处理、独立的日志写入)
- 吞吐量优先,任务顺序不重要
- 需要充分利用多核 CPU
六、syncTaskInQueue 的使用场景与风险#
正确用法#
1
2
3
4
5
6
7
8
9
| SerialTaskQueue dbQueue("DB-Queue");
// 在主线程中:投递 DB 查询并同步等待结果
std::string result;
dbQueue.syncTaskInQueue([&result]() {
result = db.query("SELECT name FROM player WHERE id=1");
});
// 等 DB 查询完成后继续(result 已填充)
LOG_INFO << "玩家名: " << result;
|
危险用法(死锁)#
1
2
3
4
5
6
| SerialTaskQueue q("test");
q.runTaskInQueue([&q]() {
// 这里是 q 的执行线程!
q.syncTaskInQueue([]() { /* ... */ });
// ↑ 死锁:等自己执行完,但自己在等自己
});
|
七、与 EventLoop 线程的协作模式#
游戏服务器的标准异步模式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| [I/O EventLoop 线程]
收到玩家请求
│
▼
SerialTaskQueue::runTaskInQueue([conn, request]() {
// DB 线程执行
auto result = db.query(request);
// 完成后回调到 I/O 线程
conn->getLoop()->runInLoop([conn, result]() {
conn->send(buildResponse(result));
});
});
│
▼(立即返回,继续处理其他事件)
|
为什么要回调到 I/O 线程?
conn->send() 不是线程安全的(它操作 writeBufferList_),必须在连接所在的 EventLoop 线程执行。DB 线程里直接调用是线程安全隐患,用 runInLoop 把发送操作切回 I/O 线程。
八、游戏服务器实践#
8.1 玩家数据库操作(串行)#
1
2
3
4
5
6
7
8
9
10
11
12
13
| // 每个玩家一个 SerialTaskQueue,保证同一玩家的 DB 操作串行
class PlayerSession {
SerialTaskQueue dbQueue_{"Player-DB"};
void saveData(const PlayerData &data) {
dbQueue_.runTaskInQueue([data, conn = conn_]() {
db.update("player", data); // 阻塞 DB 写
conn->getLoop()->runInLoop([conn]() {
conn->send(buildSaveAckPacket());
});
});
}
};
|
8.2 批量日志处理(并发)#
1
2
3
4
5
6
7
8
9
10
| // 多个线程并行处理日志压缩
ConcurrentTaskQueue logQueue(4, "LogCompress");
for (auto &logFile : logFiles) {
logQueue.runTaskInQueue([logFile]() {
compressFile(logFile); // CPU 密集型,可并发
});
}
// 等所有压缩完成
// (ConcurrentTaskQueue 没有 waitAllTasksFinished,需要自己用 future/barrier)
|
8.3 同步获取玩家数据(服务器间调用)#
1
2
3
4
5
6
7
8
9
10
11
| SerialTaskQueue dbQueue("DB");
// GM 工具调用:同步查询玩家信息(阻塞等待结果)
std::string getPlayerInfo(int playerId) {
std::string info;
dbQueue.syncTaskInQueue([&]() {
info = db.query("SELECT * FROM player WHERE id=" +
std::to_string(playerId));
});
return info;
}
|
核心收获#
syncTaskInQueue 基类实现"同步等待":用 promise/future 包装任务,子类无需了解任何细节即免费获得SerialTaskQueue 只是 EventLoopThread 的薄包装(~50行),串行保证来自单线程 EventLoop,无需额外锁waitAllTasksFinished() 妙招:投递一个空任务并同步等它完成,等同于等待队列清空ConcurrentTaskQueue 生产者-消费者三要素:notify_one 防惊群、while 循环防虚假唤醒、任务在锁外执行- 选型:同一玩家 DB 操作 → SerialTaskQueue(保序);独立批量任务 → ConcurrentTaskQueue(并发)
九、思考题#
SerialTaskQueue 里的任务实际上是通过 EventLoop::runInLoop 执行的。这意味着 SerialTaskQueue 里的任务也可以用 runAfter / runEvery 注册定时器(loopThread_.getLoop()->runAfter(...))。这在什么游戏场景下有用?有什么风险?
ConcurrentTaskQueue 的 taskQueue_ 是 std::queue(FIFO)。如果有一个高优先级任务(例如玩家断线时的数据保存)需要插队到队首,std::queue 做不到。如何改造 ConcurrentTaskQueue 支持优先级?
ConcurrentTaskQueue::stop() 调用 notify_all() 唤醒所有线程,但此时 taskQueue_ 里可能还有未执行的任务。这些任务会被丢弃吗?如果要实现"排干队列再停止",应该如何修改?
SerialTaskQueue 和 EventLoopThread 的关系非常紧密(SerialTaskQueue 只是 EventLoopThread 的包装)。为什么不直接暴露 EventLoopThread 给用户,而要封装成 TaskQueue 接口?这个设计决策带来了什么好处?
十、思考题参考答案#
1. SerialTaskQueue 的定时器能力:游戏场景与风险#
可行性分析:
SerialTaskQueue 底层是 EventLoopThread,因此可以直接访问 EventLoop 的定时器 API:
1
2
3
4
5
6
| SerialTaskQueue dbQueue("DB-Queue");
auto *loop = dbQueue.getLoop(); // 获取内部 EventLoop(SerialTaskQueue 未暴露此方法,需扩展或直接用 EventLoopThread)
loop->runEvery(60.0, []() {
// 每 60 秒执行一次
db.execute("DELETE FROM expired_sessions WHERE expire_time < NOW()");
});
|
有用的游戏场景:
- 定时存盘:每隔 N 秒自动将玩家数据写入数据库。由于是在
SerialTaskQueue 线程执行,与普通的 DB 写操作天然串行,不需要额外加锁。 - 定时清理过期数据:如过期的邮件、临时道具到期删除等。
- 心跳检测:定时检查某个外部服务是否存活(如 DB 连接心跳 ping)。
- 延迟重试:DB 操作失败后,用
runAfter(5.0, retryTask) 延迟 5 秒重试。
风险:
- 定时任务阻塞普通任务:定时器任务和
runTaskInQueue 投递的任务共享同一个线程。如果定时任务执行了耗时操作(比如一次大表扫描),期间所有投递到该队列的普通任务(如玩家存档)都会被排在后面等待。 - 定时精度受任务执行时间影响:如果前一个任务执行了 3 秒,一个
runEvery(1.0) 的定时器在这 3 秒内不会触发,回来后会连续触发补偿(取决于 EventLoop 的定时器实现),或者直接跳过。对需要精确定时的场景不适合。 - 停止顺序问题:
SerialTaskQueue::stop() 会调用 EventLoop::quit(),这会中断正在等待的定时器。如果定时任务正在执行一半的 DB 事务,可能导致事务未提交。需要在 stop() 前先取消定时器或等待当前任务完成。 - 接口泄漏:
SerialTaskQueue 没有暴露 getLoop() 方法(这是有意为之,见第 4 题),如果绕过封装直接获取 EventLoop 指针来注册定时器,就破坏了 TaskQueue 抽象,代码维护者可能不知道队列里还有定时任务在跑。
2. 如何改造 ConcurrentTaskQueue 支持优先级#
方案:将 std::queue 替换为 std::priority_queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| // 定义带优先级的任务包装
struct PriorityTask {
int priority; // 数值越大优先级越高
uint64_t sequence; // 投递序号,相同优先级按 FIFO
std::function<void()> task;
// priority_queue 是最大堆,priority 大的排前面
// 相同 priority 时 sequence 小的排前面(先投递的先执行)
bool operator<(const PriorityTask &rhs) const {
if (priority != rhs.priority)
return priority < rhs.priority; // 优先级低的排后面
return sequence > rhs.sequence; // 序号大的排后面
}
};
class PriorityConcurrentTaskQueue : public TaskQueue {
std::priority_queue<PriorityTask> taskQueue_; // 替换 std::queue
std::atomic<uint64_t> nextSequence_{0}; // 全局递增序号
std::mutex taskMutex_;
std::condition_variable taskCond_;
// ...
void runTaskInQueue(std::function<void()> &&task, int priority = 0) {
{
std::lock_guard<std::mutex> lock(taskMutex_);
taskQueue_.push({priority,
nextSequence_.fetch_add(1),
std::move(task)});
}
taskCond_.notify_one();
}
};
|
关键设计点:
sequence 字段:保证相同优先级内 FIFO 顺序,防止后来的同优先级任务插队到前面。- 接口兼容:默认
priority=0 保持与原始 TaskQueue 接口兼容。但 TaskQueue 基类的虚函数签名没有 priority 参数,可能需要增加新的虚方法或用不同方式传递优先级(如 thread_local 变量、任务包装器等)。 - 饥饿风险:高优先级任务不断到来时,低优先级任务可能一直得不到执行。可以加入老化机制(aging):任务在队列中每等待一个时间段,优先级自动提升。
- 性能影响:
std::priority_queue 的插入和取出都是 O(log N),相比 std::queue 的 O(1) 有所增加。在队列长度通常较短(几十到几百)的场景下影响不大。
3. stop() 时未执行任务的命运与"排干队列"实现#
当前行为分析:
看源码 ConcurrentTaskQueue::queueFunc():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| while (!stop_) {
std::function<void()> r;
{
std::unique_lock<std::mutex> lock(taskMutex_);
while (!stop_ && taskQueue_.size() == 0)
taskCond_.wait(lock);
if (taskQueue_.size() > 0) {
r = std::move(taskQueue_.front());
taskQueue_.pop();
} else {
continue; // stop_==true 且队列空 → 退出循环
}
}
r();
}
|
stop() 设置 stop_=true 并 notify_all()。此时工作线程被唤醒后:
- 如果
taskQueue_ 不为空,仍会取出一个任务执行(taskQueue_.size() > 0 的分支先于 stop_ 检查)。 - 执行完后回到
while (!stop_) 检查,此时 stop_==true,退出循环。
所以每个工作线程最多再执行一个已在队列中的任务,剩余未被取出的任务会被丢弃(留在 taskQueue_ 中,随 ConcurrentTaskQueue 析构而销毁,std::function 的析构器释放捕获的资源,但任务本身不会被执行)。
实现"排干队列再停止":
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| void ConcurrentTaskQueue::gracefulStop() {
// 第一步:标记"不再接受新任务"
accepting_ = false;
// 第二步:等待队列清空
{
std::unique_lock<std::mutex> lock(taskMutex_);
drainCond_.wait(lock, [this]() {
return taskQueue_.empty();
});
}
// 第三步:此时队列已空,安全停止
stop_ = true;
taskCond_.notify_all();
for (auto &t : threads_)
t.join();
}
|
同时修改 runTaskInQueue:
1
2
3
4
5
6
7
8
9
| void runTaskInQueue(std::function<void()> &&task) {
if (accepting_) {
std::lock_guard<std::mutex> lock(taskMutex_);
taskQueue_.push(std::move(task));
taskCond_.notify_one();
} else {
LOG_WARN << "队列已停止接受新任务,任务被拒绝";
}
}
|
修改 queueFunc 在每次取出任务后检查是否清空:
1
2
3
| // 取出任务后:
if (taskQueue_.empty())
drainCond_.notify_all(); // 通知 gracefulStop 队列已空
|
或者更简单的方式:直接复用 SerialTaskQueue 的思路,投递一个"毒丸"(poison pill)空任务作为结束标记:
1
2
3
4
5
6
| void gracefulStop() {
for (size_t i = 0; i < threads_.size(); ++i) {
runTaskInQueue([this]() { stop_ = true; }); // 每个线程吃一颗毒丸
}
for (auto &t : threads_) t.join();
}
|
但这个方案有微妙问题:毒丸之后可能还有正常任务被其他生产者投递进来,需要配合"不再接受新任务"标志一起使用。
4. 为什么封装 TaskQueue 接口而不直接暴露 EventLoopThread#
设计动机——接口隔离原则(ISP)和依赖倒置原则(DIP):
统一抽象:TaskQueue 是一个"投递任务并异步执行"的抽象接口。SerialTaskQueue(串行)和 ConcurrentTaskQueue(并发)实现了相同的接口。用户代码只依赖 TaskQueue*,可以在不修改调用方代码的情况下切换实现:
1
2
3
4
5
6
7
8
9
| // 配置决定用串行还是并发
std::unique_ptr<TaskQueue> queue;
if (needSerial)
queue = std::make_unique<SerialTaskQueue>("DB");
else
queue = std::make_unique<ConcurrentTaskQueue>(4, "DB");
// 调用方代码完全不变
queue->runTaskInQueue(myTask);
|
隐藏实现细节:EventLoopThread 暴露了 getLoop(),用户可以调用 runAfter、runEvery、enableReading 等任意 EventLoop 方法,而这些对于"纯任务队列"场景是不需要甚至危险的。TaskQueue 接口只暴露 runTaskInQueue 和 syncTaskInQueue,约束了用户只能做"投递任务"这一件事。
可替换性:如果未来需要一个基于协程池的 TaskQueue、一个基于 io_uring 的 TaskQueue、甚至一个跨进程的远程任务队列,只要实现 TaskQueue 接口即可。直接用 EventLoopThread 就把实现绑死了。
防止误用:EventLoopThread::getLoop() 返回的 EventLoop* 如果被用户保存下来,在 EventLoopThread 析构后就变成悬空指针。TaskQueue 接口不暴露内部指针,降低了生命周期管理的风险。
syncTaskInQueue 的复用:基类 TaskQueue 已经用 promise/future 实现了同步等待,任何子类都免费获得这个功能。如果直接用 EventLoopThread,每个调用点都得自己写 promise/future 逻辑。
学习日期:2025-03-29 | 上一课:第13课_多线程EventLoop | 下一课:第15课_TLS安全通信