第 14 课:任务队列 — TaskQueue、SerialTaskQueue、ConcurrentTaskQueue

对应源文件:

  • trantor/utils/TaskQueue.h — 抽象基类
  • trantor/utils/SerialTaskQueue.h / SerialTaskQueue.cc — 串行执行队列
  • trantor/utils/ConcurrentTaskQueue.h / ConcurrentTaskQueue.cc — 并发线程池队列

一、为什么需要 TaskQueue?

EventLoop 是单线程的,其中不能执行任何阻塞操作(数据库查询、文件 I/O、耗时计算),否则整条链路的 I/O 响应都会被拖慢。

TaskQueue 提供了一个"卸载阻塞任务"的机制:

1
2
3
4
5
6
[EventLoop 线程]            [TaskQueue 线程]
  收到玩家请求                       │
  → 投递到 TaskQueue  ────────────►│ 执行 DB 查询(可阻塞)
  → 立即返回,处理下一个事件          │ 查询完成
                                   │ → 回调投递回 EventLoop
  ◄── 收到结果,发送响应 ────────────┘

这是异步编程的基本模式:不阻塞事件循环,把耗时操作委托给专用线程


二、TaskQueue — 抽象基类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class TaskQueue : public NonCopyable {
  public:
    // 纯虚:子类实现具体的投递方式
    virtual void runTaskInQueue(const std::function<void()> &task) = 0;
    virtual void runTaskInQueue(std::function<void()> &&task) = 0;
    virtual std::string getName() const { return ""; }

    // 同步执行:投递任务并阻塞等待完成(基类实现,子类免费获得)
    void syncTaskInQueue(const std::function<void()> &task) {
        std::promise<int> prom;
        std::future<int> fut = prom.get_future();
        runTaskInQueue([&]() {
            task();
            prom.set_value(1);   // 任务完成,解锁调用方
        });
        fut.get();               // 阻塞等待
    }
};

syncTaskInQueue 的精妙之处

基类只用 promise/future 就实现了"同步等待"语义,子类无需知道任何细节。任何继承 TaskQueue 的类都自动获得这个能力。

⚠️ 使用 syncTaskInQueue 的危险:如果从 TaskQueue 的执行线程里调用 syncTaskInQueue(向自己投递任务),会死锁——执行线程阻塞等待自己处理完投递的任务,而任务在等自己释放执行权。


三、SerialTaskQueue — 串行任务队列

3.1 实现极简:复用 EventLoopThread

1
2
3
4
5
6
class SerialTaskQueue : public TaskQueue {
  protected:
    std::string queueName_;
    EventLoopThread loopThread_;   // 一个 EventLoopThread = 一个独立线程 + EventLoop
    bool stop_{false};
};

整个 SerialTaskQueue 的核心只有这三行实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
SerialTaskQueue::SerialTaskQueue(const std::string &name)
    : queueName_(name.empty() ? "SerialTaskQueue" : name),
      loopThread_(queueName_)          // 启动独立线程,EventLoop 在那里
{
    loopThread_.run();                 // 立刻开始 loop()
}

void SerialTaskQueue::runTaskInQueue(const std::function<void()> &task)
{
    loopThread_.getLoop()->runInLoop(task);  // 投递到 EventLoop
}
void SerialTaskQueue::runTaskInQueue(std::function<void()> &&task)
{
    loopThread_.getLoop()->runInLoop(std::move(task));
}

设计亮点:站在巨人肩膀上

SerialTaskQueue 没有自己实现任何队列、锁、线程管理——全部复用 EventLoopThread

EventLoop::runInLoop() 已经保证了:

  • 线程安全(跨线程投递用 queueInLoop
  • 串行执行(单线程 EventLoop,任务一个接一个执行)
  • 任务队列管理(MpscQueue<Func> funcs_

SerialTaskQueue 只是给 EventLoopThread 套了一个 TaskQueue 接口,让它符合"任务队列"的使用规范。

3.2 串行保证

由于底层是单线程 EventLoop,所有投递到 SerialTaskQueue 的任务严格按投递顺序执行,且不会并发。这天然解决了数据竞争问题。

1
2
投递顺序:task1 → task2 → task3
执行顺序:task1(完成)→ task2(完成)→ task3(完成)

3.3 waitAllTasksFinished()

1
2
3
4
5
void SerialTaskQueue::waitAllTasksFinished() {
    syncTaskInQueue([]() {
        // 空任务
    });
}

这是一个非常聪明的技巧:投递一个空任务,等它执行完

由于串行执行,空任务执行完意味着它之前所有已投递的任务也都执行完了。syncTaskInQueue 阻塞等待这个空任务,等到它返回,队列就"清空"了。

3.4 isRunningTask()

1
2
3
4
5
bool isRunningTask() {
    return loopThread_.getLoop()
               ? loopThread_.getLoop()->isCallingFunctions()
               : false;
}

isCallingFunctions()EventLoop 的方法,返回当前是否正在执行 doRunInLoopFuncs()(即是否有任务在运行)。可以用来判断队列是否"忙"。

3.5 析构与停止

1
2
3
4
5
6
7
8
9
void SerialTaskQueue::stop() {
    stop_ = true;
    loopThread_.getLoop()->quit();   // 通知 EventLoop 退出
    loopThread_.wait();              // 等线程结束
}

SerialTaskQueue::~SerialTaskQueue() {
    if (!stop_) stop();
}

析构时自动停止,stop_ 标志防止重复调用。


四、ConcurrentTaskQueue — 并发线程池队列

4.1 数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class ConcurrentTaskQueue : public TaskQueue {
  private:
    size_t queueCount_;
    std::string queueName_;

    std::queue<std::function<void()>> taskQueue_;  // 共享任务队列
    std::vector<std::thread> threads_;             // 工作线程池

    std::mutex taskMutex_;            // 保护 taskQueue_
    std::condition_variable taskCond_; // 任务到来时唤醒线程
    std::atomic_bool stop_;
};

经典的生产者-消费者模型:

  • 生产者:调用 runTaskInQueue() 的线程(可以是 EventLoop 线程)
  • 消费者:threads_ 中的工作线程

4.2 投递任务

1
2
3
4
5
6
7
void ConcurrentTaskQueue::runTaskInQueue(std::function<void()> &&task) {
    {
        std::lock_guard<std::mutex> lock(taskMutex_);
        taskQueue_.push(std::move(task));   // 入队
    }
    taskCond_.notify_one();   // 唤醒一个等待的工作线程
}

notify_one():只唤醒一个线程,避免"惊群效应"(一个任务来了,所有线程都被唤醒,但只有一个能拿到任务,其余继续睡眠,白白上下文切换)。

4.3 工作线程循环 queueFunc()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void ConcurrentTaskQueue::queueFunc(int queueNum)
{
    char tmpName[32];
    snprintf(tmpName, sizeof(tmpName), "%s%d", queueName_.c_str(), queueNum);
#ifdef __linux__
    ::prctl(PR_SET_NAME, tmpName);    // 设置线程名,便于 top/htop 观察
#endif

    while (!stop_)
    {
        std::function<void()> r;
        {
            std::unique_lock<std::mutex> lock(taskMutex_);
            while (!stop_ && taskQueue_.empty())
            {
                taskCond_.wait(lock);   // 释放锁,睡眠等待任务
            }
            if (!taskQueue_.empty()) {
                r = std::move(taskQueue_.front());  // 取出任务(move,不拷贝)
                taskQueue_.pop();
            } else {
                continue;   // stop_ 被设置,队列空,退出循环
            }
        }   // 释放锁

        r();   // 执行任务(在锁外面!)
    }
}

关键细节:任务在锁外执行

1
2
}  // ← 锁在这里释放
r();  // ← 任务在锁外执行

如果在持有 taskMutex_ 时执行 r(),那么:

  1. r() 期间不能再投递新任务(runTaskInQueue 会死锁)
  2. 其他工作线程不能取新任务

所以把任务提取出来(move),释放锁,再执行。这是生产者-消费者模式的标准写法。

while (!stop_ && taskQueue_.empty()) 而不是 if

condition_variable::wait() 可能发生虚假唤醒(spurious wakeup)——没有 notify 也会醒来。用 while 循环而不是 if 确保只有真正有任务时才继续,是使用 condition_variable 的标准做法。

4.4 停止与析构

1
2
3
4
5
6
7
8
9
void ConcurrentTaskQueue::stop() {
    if (!stop_) {
        stop_ = true;
        taskCond_.notify_all();   // 唤醒所有等待线程,让它们检查 stop_ 退出
        for (auto &t : threads_)
            t.join();
    }
}
ConcurrentTaskQueue::~ConcurrentTaskQueue() { stop(); }

notify_all() 在停止时必须唤醒所有线程,否则正在 wait(lock) 的线程永远不会检查 stop_join() 会死锁。


五、两种队列的本质区别

SerialTaskQueueConcurrentTaskQueue
底层实现EventLoopThread(复用 EventLoop)传统线程池 + mutex + condition_variable
并发度严格串行(1 个线程)N 个线程并发
任务顺序保证顺序不保证顺序
共享数据不需要额外保护(单线程)需要自己加锁
定时器支持有(EventLoop::runAfter
唤醒机制eventfd(EventLoop 内建)condition_variable
线程名EventLoopThread 设置prctl 设置(name + 数字

选择建议

SerialTaskQueue

  • 需要保证操作顺序(如同一个玩家的数据库写操作必须串行)
  • 操作共享状态(不加锁的前提是单线程访问)
  • 偶尔需要定时任务

ConcurrentTaskQueue

  • 任务彼此独立,可以并行(如批量图片处理、独立的日志写入)
  • 吞吐量优先,任务顺序不重要
  • 需要充分利用多核 CPU

六、syncTaskInQueue 的使用场景与风险

正确用法

1
2
3
4
5
6
7
8
9
SerialTaskQueue dbQueue("DB-Queue");

// 在主线程中:投递 DB 查询并同步等待结果
std::string result;
dbQueue.syncTaskInQueue([&result]() {
    result = db.query("SELECT name FROM player WHERE id=1");
});
// 等 DB 查询完成后继续(result 已填充)
LOG_INFO << "玩家名: " << result;

危险用法(死锁)

1
2
3
4
5
6
SerialTaskQueue q("test");
q.runTaskInQueue([&q]() {
    // 这里是 q 的执行线程!
    q.syncTaskInQueue([]() { /* ... */ });
    // ↑ 死锁:等自己执行完,但自己在等自己
});

七、与 EventLoop 线程的协作模式

游戏服务器的标准异步模式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
[I/O EventLoop 线程]
    收到玩家请求
SerialTaskQueue::runTaskInQueue([conn, request]() {
    // DB 线程执行
    auto result = db.query(request);
    // 完成后回调到 I/O 线程
    conn->getLoop()->runInLoop([conn, result]() {
        conn->send(buildResponse(result));
    });
});
    ▼(立即返回,继续处理其他事件)

为什么要回调到 I/O 线程?

conn->send() 不是线程安全的(它操作 writeBufferList_),必须在连接所在的 EventLoop 线程执行。DB 线程里直接调用是线程安全隐患,用 runInLoop 把发送操作切回 I/O 线程。


八、游戏服务器实践

8.1 玩家数据库操作(串行)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 每个玩家一个 SerialTaskQueue,保证同一玩家的 DB 操作串行
class PlayerSession {
    SerialTaskQueue dbQueue_{"Player-DB"};

    void saveData(const PlayerData &data) {
        dbQueue_.runTaskInQueue([data, conn = conn_]() {
            db.update("player", data);  // 阻塞 DB 写
            conn->getLoop()->runInLoop([conn]() {
                conn->send(buildSaveAckPacket());
            });
        });
    }
};

8.2 批量日志处理(并发)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 多个线程并行处理日志压缩
ConcurrentTaskQueue logQueue(4, "LogCompress");

for (auto &logFile : logFiles) {
    logQueue.runTaskInQueue([logFile]() {
        compressFile(logFile);   // CPU 密集型,可并发
    });
}
// 等所有压缩完成
// (ConcurrentTaskQueue 没有 waitAllTasksFinished,需要自己用 future/barrier)

8.3 同步获取玩家数据(服务器间调用)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
SerialTaskQueue dbQueue("DB");

// GM 工具调用:同步查询玩家信息(阻塞等待结果)
std::string getPlayerInfo(int playerId) {
    std::string info;
    dbQueue.syncTaskInQueue([&]() {
        info = db.query("SELECT * FROM player WHERE id=" +
                        std::to_string(playerId));
    });
    return info;
}

核心收获

  • syncTaskInQueue 基类实现"同步等待":用 promise/future 包装任务,子类无需了解任何细节即免费获得
  • SerialTaskQueue 只是 EventLoopThread 的薄包装(~50行),串行保证来自单线程 EventLoop,无需额外锁
  • waitAllTasksFinished() 妙招:投递一个空任务并同步等它完成,等同于等待队列清空
  • ConcurrentTaskQueue 生产者-消费者三要素:notify_one 防惊群、while 循环防虚假唤醒、任务在锁外执行
  • 选型:同一玩家 DB 操作 → SerialTaskQueue(保序);独立批量任务 → ConcurrentTaskQueue(并发)

九、思考题

  1. SerialTaskQueue 里的任务实际上是通过 EventLoop::runInLoop 执行的。这意味着 SerialTaskQueue 里的任务也可以用 runAfter / runEvery 注册定时器(loopThread_.getLoop()->runAfter(...))。这在什么游戏场景下有用?有什么风险?

  2. ConcurrentTaskQueuetaskQueue_std::queue(FIFO)。如果有一个高优先级任务(例如玩家断线时的数据保存)需要插队到队首,std::queue 做不到。如何改造 ConcurrentTaskQueue 支持优先级?

  3. ConcurrentTaskQueue::stop() 调用 notify_all() 唤醒所有线程,但此时 taskQueue_ 里可能还有未执行的任务。这些任务会被丢弃吗?如果要实现"排干队列再停止",应该如何修改?

  4. SerialTaskQueueEventLoopThread 的关系非常紧密(SerialTaskQueue 只是 EventLoopThread 的包装)。为什么不直接暴露 EventLoopThread 给用户,而要封装成 TaskQueue 接口?这个设计决策带来了什么好处?


十、思考题参考答案

1. SerialTaskQueue 的定时器能力:游戏场景与风险

可行性分析

SerialTaskQueue 底层是 EventLoopThread,因此可以直接访问 EventLoop 的定时器 API:

1
2
3
4
5
6
SerialTaskQueue dbQueue("DB-Queue");
auto *loop = dbQueue.getLoop();  // 获取内部 EventLoop(SerialTaskQueue 未暴露此方法,需扩展或直接用 EventLoopThread)
loop->runEvery(60.0, []() {
    // 每 60 秒执行一次
    db.execute("DELETE FROM expired_sessions WHERE expire_time < NOW()");
});

有用的游戏场景

  1. 定时存盘:每隔 N 秒自动将玩家数据写入数据库。由于是在 SerialTaskQueue 线程执行,与普通的 DB 写操作天然串行,不需要额外加锁。
  2. 定时清理过期数据:如过期的邮件、临时道具到期删除等。
  3. 心跳检测:定时检查某个外部服务是否存活(如 DB 连接心跳 ping)。
  4. 延迟重试:DB 操作失败后,用 runAfter(5.0, retryTask) 延迟 5 秒重试。

风险

  1. 定时任务阻塞普通任务:定时器任务和 runTaskInQueue 投递的任务共享同一个线程。如果定时任务执行了耗时操作(比如一次大表扫描),期间所有投递到该队列的普通任务(如玩家存档)都会被排在后面等待。
  2. 定时精度受任务执行时间影响:如果前一个任务执行了 3 秒,一个 runEvery(1.0) 的定时器在这 3 秒内不会触发,回来后会连续触发补偿(取决于 EventLoop 的定时器实现),或者直接跳过。对需要精确定时的场景不适合。
  3. 停止顺序问题SerialTaskQueue::stop() 会调用 EventLoop::quit(),这会中断正在等待的定时器。如果定时任务正在执行一半的 DB 事务,可能导致事务未提交。需要在 stop() 前先取消定时器或等待当前任务完成。
  4. 接口泄漏SerialTaskQueue 没有暴露 getLoop() 方法(这是有意为之,见第 4 题),如果绕过封装直接获取 EventLoop 指针来注册定时器,就破坏了 TaskQueue 抽象,代码维护者可能不知道队列里还有定时任务在跑。

2. 如何改造 ConcurrentTaskQueue 支持优先级

方案:将 std::queue 替换为 std::priority_queue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 定义带优先级的任务包装
struct PriorityTask {
    int priority;                    // 数值越大优先级越高
    uint64_t sequence;               // 投递序号,相同优先级按 FIFO
    std::function<void()> task;

    // priority_queue 是最大堆,priority 大的排前面
    // 相同 priority 时 sequence 小的排前面(先投递的先执行)
    bool operator<(const PriorityTask &rhs) const {
        if (priority != rhs.priority)
            return priority < rhs.priority;      // 优先级低的排后面
        return sequence > rhs.sequence;           // 序号大的排后面
    }
};

class PriorityConcurrentTaskQueue : public TaskQueue {
    std::priority_queue<PriorityTask> taskQueue_;   // 替换 std::queue
    std::atomic<uint64_t> nextSequence_{0};         // 全局递增序号
    std::mutex taskMutex_;
    std::condition_variable taskCond_;
    // ...

    void runTaskInQueue(std::function<void()> &&task, int priority = 0) {
        {
            std::lock_guard<std::mutex> lock(taskMutex_);
            taskQueue_.push({priority,
                             nextSequence_.fetch_add(1),
                             std::move(task)});
        }
        taskCond_.notify_one();
    }
};

关键设计点

  1. sequence 字段:保证相同优先级内 FIFO 顺序,防止后来的同优先级任务插队到前面。
  2. 接口兼容:默认 priority=0 保持与原始 TaskQueue 接口兼容。但 TaskQueue 基类的虚函数签名没有 priority 参数,可能需要增加新的虚方法或用不同方式传递优先级(如 thread_local 变量、任务包装器等)。
  3. 饥饿风险:高优先级任务不断到来时,低优先级任务可能一直得不到执行。可以加入老化机制(aging):任务在队列中每等待一个时间段,优先级自动提升。
  4. 性能影响std::priority_queue 的插入和取出都是 O(log N),相比 std::queue 的 O(1) 有所增加。在队列长度通常较短(几十到几百)的场景下影响不大。

3. stop() 时未执行任务的命运与"排干队列"实现

当前行为分析

看源码 ConcurrentTaskQueue::queueFunc()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
while (!stop_) {
    std::function<void()> r;
    {
        std::unique_lock<std::mutex> lock(taskMutex_);
        while (!stop_ && taskQueue_.size() == 0)
            taskCond_.wait(lock);
        if (taskQueue_.size() > 0) {
            r = std::move(taskQueue_.front());
            taskQueue_.pop();
        } else {
            continue;   // stop_==true 且队列空 → 退出循环
        }
    }
    r();
}

stop() 设置 stop_=truenotify_all()。此时工作线程被唤醒后:

  • 如果 taskQueue_ 不为空,仍会取出一个任务执行taskQueue_.size() > 0 的分支先于 stop_ 检查)。
  • 执行完后回到 while (!stop_) 检查,此时 stop_==true,退出循环。

所以每个工作线程最多再执行一个已在队列中的任务,剩余未被取出的任务会被丢弃(留在 taskQueue_ 中,随 ConcurrentTaskQueue 析构而销毁,std::function 的析构器释放捕获的资源,但任务本身不会被执行)。

实现"排干队列再停止"

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void ConcurrentTaskQueue::gracefulStop() {
    // 第一步:标记"不再接受新任务"
    accepting_ = false;

    // 第二步:等待队列清空
    {
        std::unique_lock<std::mutex> lock(taskMutex_);
        drainCond_.wait(lock, [this]() {
            return taskQueue_.empty();
        });
    }

    // 第三步:此时队列已空,安全停止
    stop_ = true;
    taskCond_.notify_all();
    for (auto &t : threads_)
        t.join();
}

同时修改 runTaskInQueue

1
2
3
4
5
6
7
8
9
void runTaskInQueue(std::function<void()> &&task) {
    if (accepting_) {
        std::lock_guard<std::mutex> lock(taskMutex_);
        taskQueue_.push(std::move(task));
        taskCond_.notify_one();
    } else {
        LOG_WARN << "队列已停止接受新任务,任务被拒绝";
    }
}

修改 queueFunc 在每次取出任务后检查是否清空:

1
2
3
// 取出任务后:
if (taskQueue_.empty())
    drainCond_.notify_all();   // 通知 gracefulStop 队列已空

或者更简单的方式:直接复用 SerialTaskQueue 的思路,投递一个"毒丸"(poison pill)空任务作为结束标记:

1
2
3
4
5
6
void gracefulStop() {
    for (size_t i = 0; i < threads_.size(); ++i) {
        runTaskInQueue([this]() { stop_ = true; });  // 每个线程吃一颗毒丸
    }
    for (auto &t : threads_) t.join();
}

但这个方案有微妙问题:毒丸之后可能还有正常任务被其他生产者投递进来,需要配合"不再接受新任务"标志一起使用。

4. 为什么封装 TaskQueue 接口而不直接暴露 EventLoopThread

设计动机——接口隔离原则(ISP)和依赖倒置原则(DIP)

  1. 统一抽象TaskQueue 是一个"投递任务并异步执行"的抽象接口。SerialTaskQueue(串行)和 ConcurrentTaskQueue(并发)实现了相同的接口。用户代码只依赖 TaskQueue*,可以在不修改调用方代码的情况下切换实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    // 配置决定用串行还是并发
    std::unique_ptr<TaskQueue> queue;
    if (needSerial)
        queue = std::make_unique<SerialTaskQueue>("DB");
    else
        queue = std::make_unique<ConcurrentTaskQueue>(4, "DB");
    
    // 调用方代码完全不变
    queue->runTaskInQueue(myTask);
    
  2. 隐藏实现细节EventLoopThread 暴露了 getLoop(),用户可以调用 runAfterrunEveryenableReading 等任意 EventLoop 方法,而这些对于"纯任务队列"场景是不需要甚至危险的。TaskQueue 接口只暴露 runTaskInQueuesyncTaskInQueue,约束了用户只能做"投递任务"这一件事。

  3. 可替换性:如果未来需要一个基于协程池的 TaskQueue、一个基于 io_uring 的 TaskQueue、甚至一个跨进程的远程任务队列,只要实现 TaskQueue 接口即可。直接用 EventLoopThread 就把实现绑死了。

  4. 防止误用EventLoopThread::getLoop() 返回的 EventLoop* 如果被用户保存下来,在 EventLoopThread 析构后就变成悬空指针。TaskQueue 接口不暴露内部指针,降低了生命周期管理的风险。

  5. syncTaskInQueue 的复用:基类 TaskQueue 已经用 promise/future 实现了同步等待,任何子类都免费获得这个功能。如果直接用 EventLoopThread,每个调用点都得自己写 promise/future 逻辑。


学习日期:2025-03-29 | 上一课:第13课_多线程EventLoop | 下一课:第15课_TLS安全通信