第17 课:并发工具 — MpscQueue 无锁队列 & ObjectPool 对象池#
对应源文件:
trantor/utils/LockFreeQueue.h — MpscQueue<T>:无锁多生产者单消费者队列trantor/utils/ObjectPool.h — ObjectPool<T>:共享指针驱动的对象池
一、为什么需要这两个工具?#
在高性能网络库中,有两类性能瓶颈反复出现:
瓶颈 1:跨线程任务投递#
EventLoop::runInLoop() 每秒可能被调用数十万次(每个连接的读写完成都要回调)。如果用 mutex 保护投递队列,大量线程争锁会造成显著延迟。
→ 解决方案:MpscQueue<T> 无锁队列,多线程投递不需要 mutex。
瓶颈 2:高频内存分配#
HTTP 框架每个请求都要 new HttpRequest、new HttpResponse,请求完成立刻 delete。频繁的堆分配/释放不仅慢,还会造成内存碎片。
→ 解决方案:ObjectPool<T> 对象池,释放时归还而不销毁,下次直接复用。
二、MpscQueue — 无锁多生产者单消费者队列#
2.1 完整实现(共 ~115 行)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| template <typename T>
class MpscQueue : public NonCopyable
{
private:
struct BufferNode {
BufferNode() = default;
BufferNode(const T &data) : dataPtr_(new T(data)) {}
BufferNode(T &&data) : dataPtr_(new T(std::move(data))) {}
T *dataPtr_; // 堆上的数据
std::atomic<BufferNode *> next_{nullptr};
};
std::atomic<BufferNode *> head_; // 指向最新插入的节点
std::atomic<BufferNode *> tail_; // 指向最旧的哨兵节点(消费端)
};
|
2.2 初始状态#
1
2
3
4
| MpscQueue()
: head_(new BufferNode), // 创建哨兵节点
tail_(head_.load(std::memory_order_relaxed)) // tail = head = 哨兵
{}
|
初始状态:
1
| head_ → [哨兵, next=null] ← tail_
|
队列为空时:tail->next_ == nullptr。
2.3 入队(enqueue)— 多线程安全#
1
2
3
4
5
6
7
8
| void enqueue(T &&input)
{
BufferNode *node{new BufferNode(std::move(input))};
// ① 原子交换 head_,同时把新节点设为新 head_
BufferNode *prevhead{head_.exchange(node, std::memory_order_acq_rel)};
// ② 让旧 head_ 的 next_ 指向新节点(对消费者可见)
prevhead->next_.store(node, std::memory_order_release);
}
|
关键操作:head_.exchange()
atomic::exchange 是一个不可分割的"读-改-写"操作:它把 head_ 替换为 node,并返回旧值。多个线程同时调用时,每个线程都能拿到"自己之前的那个节点",不会互相踩踏。
2.4 入队过程图解#
设 P1、P2 并发插入 A、B(A 先获得 exchange):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| 初始: head_ → [哨兵, next=null] ← tail_
P1 交换后,P1 先行到步骤②之前被抢占:
head_ → [节点A, next=null]
tail_ → [哨兵, next=null] ← 哨兵的 next_ 还未更新!
P2 交换(P1 未完成时):
prevhead(P2) = 节点A
head_ → [节点B, next=null]
P2 完成步骤②:
节点A->next_ = 节点B
P1 恢复完成步骤②:
哨兵->next_ = 节点A
最终链表:
tail_ → [哨兵] → [节点A] → [节点B] ← head_
(消费顺序:A 先,B 后 —— FIFO 保持!)
|
巧妙之处:即使 P1 在步骤①和②之间被抢占,P2 仍能完成。消费者看到的链表短暂"不完整"(哨兵 next_ 还是 null),只是认为队列为空,会在下次 dequeue 时看到完整数据。单消费者模式完全可以接受这个暂时性"空"。
2.5 出队(dequeue)— 单线程专用#
1
2
3
4
5
6
7
8
9
10
11
12
13
| bool dequeue(T &output)
{
BufferNode *tail = tail_.load(std::memory_order_relaxed);
BufferNode *next = tail->next_.load(std::memory_order_acquire); // 配对 release
if (next == nullptr) return false; // 队列空
output = std::move(*(next->dataPtr_)); // 取出数据
delete next->dataPtr_;
tail_.store(next, std::memory_order_release); // next 成为新哨兵
delete tail; // 删除旧哨兵
return true;
}
|
内存序配对关系:
1
2
3
| 生产者:prevhead->next_.store(node, release)
消费者:tail->next_.load(acquire)
↑ acquire 确保看到 release 之前的所有写入(即 node->dataPtr_ 的初始化)
|
2.6 析构清理#
1
2
3
4
5
6
7
| ~MpscQueue()
{
T output;
while (this->dequeue(output)) {} // 清空所有数据
BufferNode *front = head_.load(std::memory_order_relaxed);
delete front; // 删除最后一个哨兵
}
|
析构时先把所有元素 dequeue 掉(触发 T 的析构),再删除剩余的哨兵节点(析构时哨兵 = head_)。
2.7 在 trantor 中的实际应用#
1
2
| // EventLoop.h (trantor/net/EventLoop.h 中可见)
MpscQueue<Func> funcs_; // ← EventLoop 的任务投递队列
|
EventLoop::queueInLoop(f) 把 f 入队到 funcs_,EventLoop::doRunInLoopFuncs() 在 Loop 线程单消费者地取出执行。
1
2
3
| [I/O 工作线程 1] ─── enqueue(task1) ──┐
[I/O 工作线程 2] ─── enqueue(task2) ──┤─→ MpscQueue ─→ [EventLoop 线程] dequeue
[主线程] ─── enqueue(task3) ──┘ 单消费者顺序执行
|
多个 I/O 线程同时投递任务到同一个 EventLoop,无需 mutex。
三、与传统 mutex 队列的对比#
| MpscQueue(无锁) | std::queue + mutex |
|---|
| 生产者并发 | atomic::exchange,无等待 | mutex 争抢,可能阻塞 |
| 消费者 | 只能单线程 | 可多线程 |
| 内存开销 | 每个元素一次 new BufferNode | 一个 std::queue |
| 延迟 | 极低(无上下文切换) | 依赖锁竞争情况 |
| ABA 问题 | 无(不做 CAS 比较操作) | 不适用 |
| 短暂"假空" | 有(入队两步间隙,单消费者无影响) | 无 |
适用场景:明确的"多生产者、单消费者"模式,如 EventLoop 的任务队列。
不适用场景:多消费者(如 ConcurrentTaskQueue 的工作线程池),必须用 mutex。
四、ObjectPool — shared_ptr 驱动的对象池#
4.1 完整实现#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| template <typename T>
class ObjectPool : public NonCopyable,
public std::enable_shared_from_this<ObjectPool<T>>
{
public:
std::shared_ptr<T> getObject()
{
static_assert(!std::is_pointer<T>::value, "T 不能是指针类型");
T *p{nullptr};
{
std::lock_guard<std::mutex> lock(mtx_);
if (!objs_.empty()) {
p = objs_.back(); // 从池中取最后一个
objs_.pop_back();
}
}
if (p == nullptr)
p = new T; // 池空则新建
std::weak_ptr<ObjectPool<T>> weakPtr = this->shared_from_this();
auto obj = std::shared_ptr<T>(p, [weakPtr](T *ptr) {
auto self = weakPtr.lock();
if (self) {
std::lock_guard<std::mutex> lock(self->mtx_);
self->objs_.push_back(ptr); // 归还到池(不 delete!)
} else {
delete ptr; // 池已销毁,正常释放
}
});
return obj;
}
private:
std::vector<T *> objs_; // 空闲对象列表(裸指针,池负责生命周期)
std::mutex mtx_;
};
|
4.2 核心机制:自定义 deleter#
shared_ptr<T> 构造时可以传入第二个参数——自定义 deleter:
1
2
3
4
| std::shared_ptr<T>(raw_ptr, [](T* p) {
// 当 shared_ptr 引用计数归零时,执行这里而不是 delete p
pool.push_back(p); // 归还对象
});
|
这是整个对象池最精妙的地方:利用 shared_ptr 的引用计数机制作为"对象使用中"的检测,计数归零时自动触发"归还"而非"销毁"。
4.3 为什么用 weak_ptr 而不是 shared_ptr?#
1
2
3
4
5
6
7
8
9
10
11
12
| // ❌ 错误:捕获 shared_ptr<ObjectPool>
auto obj = shared_ptr<T>(p, [self = shared_from_this()](T* ptr) {
// shared_ptr 持有 pool,pool 间接持有所有 obj(通过 objs_)
// 造成循环引用:pool → obj → pool(永不释放!)
});
// ✅ 正确:捕获 weak_ptr<ObjectPool>
auto obj = shared_ptr<T>(p, [weakPtr](T* ptr) {
auto self = weakPtr.lock(); // 尝试获取 pool
if (self) { ... } // pool 存活,归还
else { delete ptr; } // pool 已死,释放
});
|
weak_ptr 不增加引用计数,所以 pool 的生命周期由用户控制——当用户丢掉 shared_ptr<ObjectPool>,pool 即销毁,已检出的对象在用完后直接 delete 释放。
4.4 使用要求:必须用 shared_ptr 持有 Pool#
1
2
3
4
5
6
7
| // ❌ 栈上创建:shared_from_this() 会抛出 std::bad_weak_ptr
ObjectPool<Buffer> pool;
auto buf = pool.getObject(); // 崩溃!
// ✅ 必须用 make_shared
auto pool = std::make_shared<ObjectPool<Buffer>>();
auto buf = pool->getObject(); // 正确
|
enable_shared_from_this 依赖 weak_ptr 控制块,只有通过 shared_ptr 持有对象才能激活控制块。
4.5 对象池的生命周期图#
1
2
3
4
5
6
7
8
9
10
11
12
13
| 用户持有 shared_ptr<ObjectPool>
│
├─ getObject() 返回 shared_ptr<T>(带归还 deleter)
│ │
│ ├─ 用户使用 T 对象
│ │
│ └─ shared_ptr<T> 析构
│ │
│ ├─ weakPtr.lock() 成功 → objs_.push_back(raw) ← 归还!
│ └─ weakPtr.lock() 失败 → delete raw ← 释放
│
└─ 用户丢掉 shared_ptr<ObjectPool> → Pool 销毁
(此后所有还在外部的 shared_ptr<T> 析构时走 delete 分支)
|
五、两个工具的设计模式对比#
| MpscQueue | ObjectPool |
|---|
| 核心问题 | 多线程投递无锁化 | 减少 new/delete 开销 |
| 关键技术 | atomic::exchange(无锁) | shared_ptr 自定义 deleter |
| 线程安全 | enqueue 无锁,dequeue 单线程 | getObject + 归还都有 mutex |
| 内存管理 | 每次 enqueue new,dequeue delete | 对象被池复用,不频繁 new/delete |
| 设计模式 | 无锁数据结构 | 资源池(Resource Pool) |
| 使用约束 | 消费者必须单线程 | Pool 必须由 shared_ptr 持有 |
六、游戏服务器实践#
6.1 玩家消息队列(MpscQueue 应用)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| // 场景:多个工作线程(AI 计算、物理模拟)产生结果,发送给主逻辑 EventLoop
class GameLoop {
MpscQueue<GameEvent> eventQueue_;
// 工作线程调用(无锁)
void postEvent(GameEvent evt) {
eventQueue_.enqueue(std::move(evt));
loop_.wakeup(); // 唤醒主循环处理
}
// 主逻辑线程调用(单消费者)
void processEvents() {
GameEvent evt;
while (eventQueue_.dequeue(evt)) {
handleEvent(evt); // 不需要加锁处理 evt
}
}
};
|
6.2 网络消息包对象池(ObjectPool 应用)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| // 场景:每个客户端连接都要频繁 new/delete 消息包对象
using PacketPool = ObjectPool<GamePacket>;
// 服务器初始化
auto packetPool = std::make_shared<PacketPool>();
// 收到数据时
void onMessage(const TcpConnectionPtr &conn, MsgBuffer *buf) {
auto packet = packetPool->getObject(); // 从池中取(很快)
packet->parse(buf);
// 处理完后 shared_ptr<GamePacket> 析构
// → 自动归还到 packetPool(不是 delete)
processPacket(conn, std::move(packet));
}
|
性能收益:避免每个请求的 malloc/free,减少内存碎片,对 L1/L2 缓存友好(复用已热化的对象内存)。
6.3 结合使用:无锁投递 + 对象复用#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| // 最终形态:工作线程无锁投递消息,消息本身来自对象池
auto pool = std::make_shared<ObjectPool<PlayerAction>>();
MpscQueue<std::shared_ptr<PlayerAction>> actionQueue;
// AI 线程(多个)
void aiThread(int playerId) {
auto action = pool->getObject(); // 取出复用对象
action->playerId = playerId;
action->type = ActionType::Move;
actionQueue.enqueue(std::move(action)); // 无锁投递
}
// 逻辑主线程
void logicLoop() {
std::shared_ptr<PlayerAction> action;
while (actionQueue.dequeue(action)) {
applyAction(*action);
// action shared_ptr 析构 → 归还到 pool
}
}
|
核心收获#
MpscQueue 两步入队:① head_.exchange(node, acq_rel) 原子交换(多线程安全)② prevhead->next_.store(node, release) 接通链表(对消费者可见)- 两步间隙消费者看到"假空"无影响:单消费者模式下短暂认为队列为空只会少执行一次,下轮可见
- 内存序配对:生产者
release ↔ 消费者 acquire,确保看到节点数据的完整初始化 ObjectPool 核心魔法:shared_ptr 自定义 deleter → 引用计数归零时触发归还而非销毁- Pool 必须用
make_shared 持有:enable_shared_from_this 依赖控制块,栈上创建会抛 bad_weak_ptr weak_ptr 持有 Pool 防循环引用:若用 shared_ptr 捕获,pool→obj→pool 形成永不释放的循环
七、思考题#
MpscQueue 的 enqueue 分两步:① head_.exchange(node) ② prevhead->next_.store(node)。在步骤①和②之间,如果消费者调用 dequeue,它会看到什么?这对游戏服务器的事件处理有什么影响?能否接受这种行为?
ObjectPool 用 std::vector 存储空闲对象,getObject() 取 back(),归还时 push_back()——即后进先出(LIFO)。相比 FIFO,LIFO 对 CPU 缓存有什么好处?有什么潜在风险(如对象饥饿)?
ObjectPool 没有容量上限——如果某时刻产生了 10000 个对象,之后并发量降低,这 10000 个对象永远躺在 objs_ 里不会释放(内存不会回缩)。如何改造 ObjectPool 支持"水位线":超过 N 个空闲对象时直接 delete 而不归还?
MpscQueue 的每次 enqueue 都有一次 new BufferNode,dequeue 一次 delete。能否将 MpscQueue 和 ObjectPool 结合,让 BufferNode 本身也来自对象池,从而彻底消除这里的动态内存分配?这样做会遇到什么循环依赖或初始化顺序问题?
八、思考题参考答案#
1. enqueue 两步之间消费者 dequeue 会看到什么#
场景重现:
生产者执行 enqueue(data) 的两步操作:
1
2
3
4
5
6
7
8
9
| void enqueue(T &&input)
{
BufferNode *node{new BufferNode(std::move(input))};
// ① 原子交换 head_
BufferNode *prevhead{head_.exchange(node, std::memory_order_acq_rel)};
// ——— 此刻被抢占,消费者调用 dequeue ———
// ② 接通链表
prevhead->next_.store(node, std::memory_order_release);
}
|
步骤 ① 完成后,② 完成前,队列的状态:
1
2
3
4
| tail_ → [哨兵, next_=nullptr] ← 消费者从这里读
...(中间可能已有其他已完成的节点)
prevhead → [某节点, next_=nullptr] ← 还没被设为指向 node!
head_ → [node, next_=nullptr] ← 已经是新 head_
|
此时消费者调用 dequeue:
1
2
3
4
5
6
7
8
| bool dequeue(T &output)
{
BufferNode *tail = tail_.load(std::memory_order_relaxed);
BufferNode *next = tail->next_.load(std::memory_order_acquire);
if (next == nullptr) return false; // ← 看到 tail->next_ 为 null
// ...
}
|
消费者从 tail_(哨兵)开始,读 tail->next_。如果中间链表尚未被接通(即 prevhead->next_ 还没被 store),消费者看到的 next 是 nullptr,认为队列为空,返回 false。
对游戏服务器的影响:
这是一种短暂的"假空"现象——队列中确实有数据(node 已经存在于链表结构中),但消费者暂时看不到。
在 EventLoop 的使用场景中:
EventLoop::doRunInLoopFuncs() 在一次循环中用 while(dequeue) 取出所有任务- 如果某次 dequeue 因为"假空"返回 false,本轮循环结束
- 但生产者在
enqueue 完成后会调用 wakeup()(通过 eventfd 写入),唤醒 EventLoop - EventLoop 被唤醒后再次调用
doRunInLoopFuncs(),此时步骤 ② 早已完成,能看到数据
延迟影响:最坏情况下,一个任务会延迟到下一次 epoll_wait 唤醒后才被处理。对于游戏服务器的帧循环(通常 3060fps,每帧 1633ms),一个任务可能延迟一帧执行。对于大多数游戏逻辑(如玩家移动、道具使用),一帧的延迟完全可以接受。
能否接受:完全可以接受。这是无锁队列的标准折衷——用"短暂假空"换取"生产者完全无锁"。与使用 mutex 导致的线程阻塞(可能持续微秒到毫秒级)相比,“假空"只损失一次循环的时间,且不会导致任何线程阻塞。
2. ObjectPool LIFO 对 CPU 缓存的好处与风险#
LIFO(后进先出)对缓存的好处:
ObjectPool 使用 vector::push_back() 归还、vector::back() + pop_back() 取出,形成 LIFO 栈。
1
2
3
| pool 内部:[obj_oldest, ..., obj_recent]
↑ back()
getObject() 取 obj_recent
|
CPU 缓存的核心原理是时间局部性(temporal locality):最近使用过的数据最可能还在缓存中。
LIFO 取出的是最近归还的对象(obj_recent),这个对象刚被使用过,其内存页大概率还在 L1/L2 cache 中。再次使用时命中率高。
对比 FIFO(先进先出):取出的是最早归还的对象(obj_oldest),这个对象可能已经在池中闲置了很久,其内存页可能已被 cache 替换(evicted),访问时需要从主存加载,延迟高出 10~100 倍。
量化:在高频请求场景中(如 HTTP 服务器每秒处理数万请求),LIFO 可以让对象始终"热"在 cache 中,性能差异可观。
潜在风险——对象饥饿(Object Starvation):
LIFO 的问题是:最早归还的对象可能永远不会被取出。
1
2
3
| 场景:池中有 1000 个对象,并发量稳定在 10
→ 只有最"新"的 10 个对象被反复取出-归还-取出
→ 剩余 990 个对象永远躺在池底,占用内存但从不使用
|
这导致:
- 内存浪费:990 个闲置对象占用的内存本可以释放给系统。
- 对象状态陈旧:如果对象持有外部资源(如数据库连接),底部的对象可能因长期闲置而超时断开,取出时需要重新建立连接。
- 资源泄漏假象:看似池中有 1000 个对象"可用”,实际只有顶部 10 个是"活跃"的。
改进方案:
- 定时清理"冷对象":开一个后台定时器,定期检查
objs_ 底部的对象,如果闲置时间超过阈值就 delete。 - 设置池容量上限(见第 3 题)。
- 如果对象持有外部资源,在
getObject() 返回前检查资源有效性(如 ping 数据库连接)。
3. 如何改造 ObjectPool 支持"水位线"#
目标:当空闲对象数超过 maxIdle 时,归还的对象直接 delete 而不归还到池中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| template <typename T>
class ObjectPool : public NonCopyable,
public std::enable_shared_from_this<ObjectPool<T>>
{
public:
explicit ObjectPool(size_t maxIdle = std::numeric_limits<size_t>::max())
: maxIdle_(maxIdle) {}
std::shared_ptr<T> getObject()
{
static_assert(!std::is_pointer<T>::value, "T can't be pointer type");
T *p{nullptr};
{
std::lock_guard<std::mutex> lock(mtx_);
if (!objs_.empty()) {
p = objs_.back();
objs_.pop_back();
}
}
if (p == nullptr)
p = new T;
std::weak_ptr<ObjectPool<T>> weakPtr = this->shared_from_this();
auto maxIdle = maxIdle_; // 值拷贝到 lambda,避免捕获 this
auto obj = std::shared_ptr<T>(p, [weakPtr, maxIdle](T *ptr) {
auto self = weakPtr.lock();
if (self) {
std::lock_guard<std::mutex> lock(self->mtx_);
if (self->objs_.size() < maxIdle) {
self->objs_.push_back(ptr); // 池未满,归还
return;
}
}
delete ptr; // 池已满 或 池已销毁,直接释放
});
return obj;
}
private:
std::vector<T *> objs_;
std::mutex mtx_;
size_t maxIdle_;
};
|
进一步优化——带时间衰减的水位线:
简单的固定上限可能不够灵活。可以增加缩容策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // 定时调用此方法,回收多余的空闲对象
void shrink(size_t targetSize = 0) {
std::vector<T*> toDelete;
{
std::lock_guard<std::mutex> lock(mtx_);
while (objs_.size() > targetSize) {
toDelete.push_back(objs_.back());
objs_.pop_back();
}
objs_.shrink_to_fit(); // 释放 vector 多余的内存
}
for (auto p : toDelete)
delete p;
}
|
配合游戏服务器的定时器使用:
1
2
3
4
| auto pool = std::make_shared<ObjectPool<GamePacket>>(1000);
loop->runEvery(300.0, [pool]() {
pool->shrink(100); // 每 5 分钟缩容到 100 个
});
|
4. MpscQueue + ObjectPool 结合:消除 BufferNode 的动态分配#
思路:
MpscQueue 每次 enqueue 都 new BufferNode,dequeue 都 delete BufferNode。如果用 ObjectPool<BufferNode> 来管理节点内存,可以避免频繁的堆分配:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| template <typename T>
class MpscQueuePooled : public NonCopyable {
struct BufferNode {
T *dataPtr_;
std::atomic<BufferNode *> next_{nullptr};
};
std::shared_ptr<ObjectPool<BufferNode>> nodePool_;
// ...
void enqueue(T &&input) {
auto nodeShared = nodePool_->getObject();
BufferNode *node = nodeShared.get();
node->dataPtr_ = new T(std::move(input));
node->next_.store(nullptr, std::memory_order_relaxed);
// 问题:nodeShared 是 shared_ptr,超出作用域就会归还到池!
// 但 node 还在链表中被使用!
// 必须"延长"nodeShared 的生命周期到 dequeue 时
}
};
|
核心难题:生命周期管理冲突
ObjectPool 返回 shared_ptr<BufferNode>,当 shared_ptr 析构时触发归还。但 MpscQueue 的链表结构要求节点在 enqueue 后继续存活,直到 dequeue 时才释放。
解决方案与问题:
方案 A:在 BufferNode 中保存 shared_ptr 自身
1
2
3
4
5
| struct BufferNode {
T *dataPtr_;
std::atomic<BufferNode *> next_{nullptr};
std::shared_ptr<BufferNode> selfRef_; // 保持自己不被归还
};
|
问题:std::shared_ptr 不是 trivially copyable,atomic 操作不能直接作用在含 shared_ptr 的结构体上。更重要的是,shared_ptr 内部有非原子的控制块操作,在无锁队列的内存序语义下不安全。
方案 B:用 raw pointer + 手动管理的 FreeList 代替 ObjectPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| template <typename T>
class MpscQueueWithPool : public NonCopyable {
struct BufferNode {
T *dataPtr_;
std::atomic<BufferNode *> next_{nullptr};
};
// 用另一个 MpscQueue 作为 FreeList(无锁回收)
MpscQueue<BufferNode*> freeList_;
void enqueue(T &&input) {
BufferNode *node;
if (!freeList_.dequeue(node)) {
node = new BufferNode(); // FreeList 空,新建
}
node->dataPtr_ = new T(std::move(input));
node->next_.store(nullptr, std::memory_order_relaxed);
// ... 正常的入队操作 ...
}
bool dequeue(T &output) {
// ... 正常的出队操作 ...
// 取出 node 后:
freeList_.enqueue(node); // 归还到 FreeList
// 问题:这又引入了 MpscQueue 的 new/delete!
}
};
|
循环依赖:用 MpscQueue 来做 FreeList,FreeList 自身的 enqueue 又需要 new BufferNode——陷入了鸡生蛋蛋生鸡的循环。
方案 C:预分配的环形缓冲区(最实际的方案)
如果真的需要消除所有动态分配,应该放弃链表结构,改用有界无锁环形缓冲区(bounded lock-free ring buffer):
1
2
3
4
5
6
7
| template <typename T, size_t Capacity>
class BoundedMpscQueue {
std::array<std::aligned_storage_t<sizeof(T)>, Capacity> buffer_;
std::atomic<size_t> head_{0};
std::atomic<size_t> tail_{0};
// ...
};
|
但这改变了 MpscQueue 的语义——变成有界队列,满时 enqueue 会失败。对 EventLoop 的 funcs_ 队列来说,有界限制可能导致任务丢失,不可接受。
结论:在当前的 MpscQueue 链表设计下,很难干净地引入 ObjectPool,因为两者的生命周期管理模型不兼容(shared_ptr 的自动归还 vs 链表节点的手动管理)。实际中,现代内存分配器(如 jemalloc、tcmalloc)的 malloc/free 已经非常快(通过线程本地缓存),BufferNode 的分配开销在大多数场景下不是瓶颈。如果确实是瓶颈,更好的方案是使用自定义的 thread_local FreeList(简单的单链表栈,无需原子操作),在每个线程维护一小批预分配的 BufferNode。
学习日期:2025-04-10 | 上一课:第16课_DNS解析 | 下一课:第18课_密码学工具