第 13 课:多线程 EventLoop — EventLoopThread & EventLoopThreadPool
对应源文件:
trantor/net/EventLoopThread.h/EventLoopThread.cc— 在独立线程运行一个 EventLooptrantor/net/EventLoopThreadPool.h/EventLoopThreadPool.cc— EventLoop 线程池
一、为什么需要这两个类?
在第 12 课里,我们看到 TcpServer::setIoLoopNum(4) 内部创建了一个 EventLoopThreadPool。它们解决的核心问题是:
如何安全地在新线程里创建 EventLoop,并确保 EventLoop 真正开始运行后再返回给调用者?
这看起来简单,实际上有一个微妙的同步问题:
EventLoop对象必须在它将要运行的线程里创建(t_loopInThisThread线程局部变量)- 调用者拿到
EventLoop *之前,要保证该指针有效(对象已创建) run()返回之前,要保证 EventLoop 确实进入了loop()主循环(否则第一个runInLoop可能无法立刻执行)
trantor 用三个 std::promise 精确解决了这个三阶段同步问题。
二、EventLoopThread 的三阶段启动协议
2.1 成员变量一览
| |
2.2 三阶段时序图
| |
三个 promise 的职责:
| Promise | 从哪里 set | 在哪里 get | 含义 |
|---|---|---|---|
promiseForLoopPointer_ | 新线程 loopFuncs() | 主线程构造函数 | EventLoop 对象已创建,指针有效 |
promiseForRun_ | 主线程 run() | 新线程 loopFuncs() | “请开始执行 loop()” |
promiseForLoop_ | 新线程 loop 内的 queueInLoop 回调 | 主线程 run() | EventLoop 真正进入了循环 |
2.3 为什么构造和运行分离?
| |
使用场景:EventLoopThreadPool 利用这个分离,先把所有 EventLoopThread 全部构造好(线程都启动了,EventLoop 都创建了),再统一调用 start() 让所有线程同时开始循环。这样所有线程几乎同时进入工作状态,避免先启动的线程抢先处理任务而后启动的线程还没准备好的问题。
2.4 thread_local static EventLoop
| |
thread_local static 的含义:
thread_local:每个线程各有一份独立的变量static:在函数内部,生命周期延续到线程结束(不是函数调用结束)
为什么用 shared_ptr 而不是直接 EventLoop?
因为 promiseForLoopPointer_ 需要把指针传回主线程(loop_ 是 shared_ptr<EventLoop>),用 shared_ptr 可以安全共享所有权。线程结束后,thread_local 变量析构,shared_ptr 引用计数减 1;如果主线程的 loop_ 也 reset 了,EventLoop 就被销毁。
2.5 run() 的 call_once 保护
| |
std::call_once 保证多次调用 run() 只执行一次。EventLoopThreadPool::start() 和 ~EventLoopThread() 都会调用 run(),不用担心重复触发。
2.6 析构:优雅退出
| |
为什么先 run() 再 quit()?
如果 EventLoopThread 构造后从未调用过 run()(异常路径或忘记调用),新线程会一直阻塞在 promiseForRun_.get_future().get(),永远不会结束。析构时先调 run(),让线程进入 loop,再 quit() 让它退出,保证 join() 不会死锁。
三、EventLoopThreadPool
3.1 结构
| |
极简设计:一个线程向量 + 一个原子计数器,仅此而已。
3.2 构造 vs start()
| |
这个两阶段设计的价值:调用 start() 之前,可以往每个 loop 里投递初始化任务(通过 getLoop(id)->queueInLoop())。这些任务会在 loop 开始时第一批被执行,实现"启动时的初始化工作"。
3.3 getNextLoop() — 无锁 Round-Robin
| |
memory_order_relaxed 为什么安全?
loopIndex_ 只是一个计数器,我们只需要保证"每次加 1"是原子的(不会丢失更新),不需要同步其他内存操作。relaxed 提供最弱的原子保证(仅原子性),是最高效的选择。
计数器溢出(size_t 回绕到 0)也是安全的,因为取模操作依然正确。
3.4 完整接口
| |
四、EventLoopThread 启动时序的精密性
用一张图展示整个启动过程为什么是"精密"的:
| |
“快要进入” vs “已经进入"的区别:
如果 run() 只等 promiseForRun_.set_value(即只等"新线程收到信号”),主线程可能比新线程快,导致 runInLoop 投递的第一个任务在 loop->loop() 调用之前就被队列,要等到第一次 poll 返回后才执行。用 promiseForLoop_(在 loop 内部 queueInLoop 设置)保证:run() 返回时,EventLoop 至少已经完成了一次完整的循环迭代,任何紧接着 run() 之后的 runInLoop 都能被及时处理。
五、与 TcpServer 的协作
| |
六、任务队列(TaskQueue)简介
课程表中的第 14/15 课(SerialTaskQueue、ConcurrentTaskQueue)是另一类线程模型,与 EventLoopThreadPool 的区别:
| EventLoopThreadPool | SerialTaskQueue | ConcurrentTaskQueue | |
|---|---|---|---|
| 执行模型 | 每线程一个 EventLoop,I/O 驱动 | 单线程顺序执行任务 | 线程池并行执行任务 |
| 任务类型 | I/O 事件 + 定时器 + 普通任务 | 纯计算/阻塞任务 | 纯计算/阻塞任务 |
| 是否阻塞 I/O | 不能(会影响 I/O 响应延迟) | 独立线程,不影响 I/O | 独立线程池 |
| 典型用途 | 网络收发、定时器 | 串行数据库操作 | 并行文件处理 |
游戏服务器中,数据库操作(查询玩家数据)绝不能放在 EventLoop 线程里(会阻塞 I/O),应该用 SerialTaskQueue 保证操作顺序,或 ConcurrentTaskQueue 并行执行。
七、游戏服务器实践
7.1 标准的多线程服务器配置
| |
7.2 访问特定 I/O 线程
| |
7.3 在 start() 之前做初始化
| |
核心收获
- 三阶段
promise/future启动:① Loop 指针就绪 ② 主线程放行 loop() ③ queueInLoop 确认已在运行,解决"返回指针但 loop 还没跑"的竞态 - 分离"构造 EventLoop"与"开始 loop()":允许在 loop 启动前安全注册定时器、Channel 等
thread_local static shared_ptr<EventLoop>:在 Loop 线程上可用getEventLoopOfCurrentThread()获取自身EventLoopThreadPool用atomic<size_t>+fetch_add(relaxed)实现无锁 Round-Robinstd::call_once保证run()幂等,析构时的run()调用不会造成问题
八、思考题
EventLoopThread构造函数里立刻启动了线程并阻塞等待promiseForLoopPointer_。如果EventLoop构造函数内部抛出异常(例如 eventfd 创建失败),会发生什么?promise的析构行为是怎样的?EventLoopThreadPool::getNextLoop()使用memory_order_relaxed。如果有两个线程同时调用getNextLoop(),会不会拿到同一个 loop?这是 bug 吗?(提示:分析 Round-Robin 分配的目的)EventLoopThread的loopFuncs()里用thread_local static shared_ptr<EventLoop>,而不是普通的EventLoop loop。除了传回指针的需要,还有什么生命周期上的考虑?run()用std::call_once保证只执行一次。~EventLoopThread()里也调用了run()。如果用户在~EventLoopThread()析构之前已经手动调用过run(),析构时的run()会做什么?这是正确行为吗?
九、思考题参考答案
1. EventLoop 构造函数抛异常时会发生什么?promise 的析构行为是怎样的?
分析异常传播路径:
EventLoopThread 构造函数中:
| |
新线程的 loopFuncs() 中:
| |
如果 EventLoop 构造函数抛异常(例如 Linux 上 eventfd() 失败,或 Windows 上 IOCP 创建失败),make_shared<EventLoop>() 会抛出异常。此时:
新线程的行为:异常在
loopFuncs()中未被捕获,传播到std::thread的入口函数。根据 C++ 标准,如果线程函数抛出未捕获的异常,std::terminate()会被调用,整个进程终止。如果假设不会
terminate(比如有全局异常处理器):promiseForLoopPointer_从未调用set_value()或set_exception(),新线程结束后promise对象在新线程栈上析构。根据 C++ 标准:- 如果
promise被析构时既没有set_value也没有set_exception,析构函数会存储一个std::future_error(错误码broken_promise)到共享状态中 - 主线程阻塞在
f.get()上,此时get()会抛出std::future_error异常
- 如果
主线程的行为:
f.get()抛出std::future_error,EventLoopThread构造函数异常退出。由于thread_成员已经构造(线程已启动),但构造函数异常意味着对象未成功创建,析构函数不会被调用。thread_作为成员变量会被自动析构——如果线程还在运行且joinable(),std::thread析构函数会调用std::terminate()。
总结:无论哪条路径,EventLoop 构造失败都会导致进程终止。这在实际中是合理的——如果连事件循环都创建不了(系统资源耗尽),服务器已经无法正常工作,快速失败(fail-fast)是正确策略。
promise 析构的关键规则:
- 正常析构(已设置值):释放共享状态
- 未设置值就析构:自动设置
broken_promise异常到共享状态 future::get()会收到这个异常(类型为std::future_error)
2. 两个线程同时调用 getNextLoop() 会不会拿到同一个 loop?这是 bug 吗?
不会拿到同一个 loop,原因在于 fetch_add 的原子性保证。
| |
std::atomic::fetch_add(1, memory_order_relaxed) 保证原子性:即使两个线程同时调用,每个线程都会拿到不同的 index 值。假设 loopIndex_ 当前为 5:
- 线程 A 的
fetch_add返回 5,loopIndex_变为 6 - 线程 B 的
fetch_add返回 6,loopIndex_变为 7 - (或者 B 先得到 5,A 得到 6,取决于谁先执行,但不会重复)
memory_order_relaxed 只放宽了内存可见性顺序(不保证其他变量的写入对另一个线程可见的顺序),但不影响 loopIndex_ 自身的原子性。“原子性"意味着 fetch_add 是一个不可分割的 RMW(read-modify-write)操作,不会有两个线程读到相同的值。
但是,假设有一种极端情况需要讨论:
如果 loopIndex_ 溢出 size_t 的最大值回绕到 0 呢?答案是依然安全。size_t 的无符号溢出在 C++ 中是定义明确的行为(模 2^64 算术),回绕后取模仍然正确,只是从 loopThreadVector_.size()-1 跳回到 0,继续轮询。
这不是 bug。 Round-Robin 的目的是近似均匀分配,不需要严格保证"连续两次调用一定分配到不同的 loop”。即使假设两个线程偶然拿到映射到同一个 loop 的不同 index(例如 index=0 和 index=4 在 4 线程池中都映射到 loop[0]),这也只是正常的轮询行为,不影响正确性。
3. thread_local static shared_ptr<EventLoop> 除了传回指针,还有什么生命周期考虑?
这个设计有两个关键的生命周期考量:
考虑一:EventLoop 的生命周期超越 loopFuncs() 函数作用域
如果用普通局部变量:
| |
这看起来没问题——loop() 返回后 EventLoop 析构。但有一个微妙问题:EventLoop 析构时,可能还有 pending 的 queueInLoop 任务引用了 loop 内部的数据结构。thread_local static 保证变量在线程退出时才析构(而不是函数返回时),这给了一个更长的生命周期窗口。
更重要的是,loop_ 是 shared_ptr<EventLoop> 类型,promiseForLoopPointer_ 把同一个 shared_ptr 传给了主线程的 loop_ 成员。当 loopFuncs() 返回后:
| |
如果用普通的 shared_ptr<EventLoop> loop = ...(不带 thread_local static),函数返回时 loop 析构,loop_ 已被置为 nullptr,引用计数归零,EventLoop 立刻析构——但此时线程可能还没完全退出,某些全局的 thread_local 变量(如 t_loopInThisThread)可能还在引用这个 EventLoop。
考虑二:t_loopInThisThread 的线程局部指针
EventLoop 构造时会设置 t_loopInThisThread = this(线程局部变量,用于 EventLoop::getEventLoopOfCurrentThread())。如果 EventLoop 对象比线程先析构,t_loopInThisThread 就变成悬垂指针。thread_local static shared_ptr 保证 EventLoop 的生命周期至少和线程一样长,避免了这个问题。
考虑三:确保 shared_from_this() 安全
EventLoop 虽然本身不继承 enable_shared_from_this,但它内部管理的对象(如 Timer、Channel)可能通过 queueInLoop 回调间接持有 shared_ptr。用 thread_local static shared_ptr 确保在所有 pending 回调执行完毕后,EventLoop 才被销毁。
总结:thread_local static shared_ptr<EventLoop> 的设计确保了三件事:(1) EventLoop 的生命周期与线程绑定;(2) 通过 shared_ptr 可以安全地跨线程共享所有权;(3) 析构顺序正确——线程退出时才释放 EventLoop。
4. run() 用 call_once 保证只执行一次,析构时的 run() 会做什么?
如果用户已经手动调用过 run(),析构时的 run() 什么也不做。这是正确的行为。
来看完整代码:
| |
std::call_once(once_, lambda) 的语义:
once_是std::once_flag类型- 第一次调用时:执行 lambda,标记
once_为"已执行" - 后续所有调用:检测到
once_已标记,直接返回,不执行 lambda
场景一:用户手动调用过 run()
| |
这是最正常的路径,析构时的 run() 是一个安全的 no-op。
场景二:用户忘记调用 run()
| |
这个路径保证了即使忘记 run(),析构也不会死锁。原因在之前课程中已分析过:如果不先 run(),新线程一直阻塞在 promiseForRun_.get_future().get(),永远不会结束,thread_.join() 会死锁。
场景三:多线程同时调用 run()
std::call_once 本身是线程安全的。如果多个线程同时调用 run(),只有一个线程会执行 lambda,其他线程会阻塞等待该 lambda 执行完成后再返回(不会跳过)。所以不存在竞态问题。
这个设计的精妙之处:析构函数中的 run() 调用不是"执行一次循环"的意思,而是"确保新线程被放行"的安全网。call_once 让 run() 成为幂等操作(多次调用和一次调用效果相同),从而在析构路径和正常路径之间提供了优雅的统一处理。
学习日期:2025-03-28 | 上一课:第12课_TcpServer与TcpClient | 下一课:第14课_任务队列