第17 课:并发工具 — MpscQueue 无锁队列 & ObjectPool 对象池

对应源文件:

  • trantor/utils/LockFreeQueue.hMpscQueue<T>:无锁多生产者单消费者队列
  • trantor/utils/ObjectPool.hObjectPool<T>:共享指针驱动的对象池

一、为什么需要这两个工具?

在高性能网络库中,有两类性能瓶颈反复出现:

瓶颈 1:跨线程任务投递

EventLoop::runInLoop() 每秒可能被调用数十万次(每个连接的读写完成都要回调)。如果用 mutex 保护投递队列,大量线程争锁会造成显著延迟。

解决方案MpscQueue<T> 无锁队列,多线程投递不需要 mutex。

瓶颈 2:高频内存分配

HTTP 框架每个请求都要 new HttpRequestnew HttpResponse,请求完成立刻 delete。频繁的堆分配/释放不仅慢,还会造成内存碎片。

解决方案ObjectPool<T> 对象池,释放时归还而不销毁,下次直接复用。


二、MpscQueue — 无锁多生产者单消费者队列

2.1 完整实现(共 ~115 行)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
template <typename T>
class MpscQueue : public NonCopyable
{
  private:
    struct BufferNode {
        BufferNode() = default;
        BufferNode(const T &data) : dataPtr_(new T(data)) {}
        BufferNode(T &&data) : dataPtr_(new T(std::move(data))) {}
        T *dataPtr_;                           // 堆上的数据
        std::atomic<BufferNode *> next_{nullptr};
    };

    std::atomic<BufferNode *> head_;   // 指向最新插入的节点
    std::atomic<BufferNode *> tail_;   // 指向最旧的哨兵节点(消费端)
};

2.2 初始状态

1
2
3
4
MpscQueue()
    : head_(new BufferNode),          // 创建哨兵节点
      tail_(head_.load(std::memory_order_relaxed))  // tail = head = 哨兵
{}

初始状态:

1
head_ → [哨兵, next=null] ← tail_

队列为空时:tail->next_ == nullptr

2.3 入队(enqueue)— 多线程安全

1
2
3
4
5
6
7
8
void enqueue(T &&input)
{
    BufferNode *node{new BufferNode(std::move(input))};
    //  ① 原子交换 head_,同时把新节点设为新 head_
    BufferNode *prevhead{head_.exchange(node, std::memory_order_acq_rel)};
    //  ② 让旧 head_ 的 next_ 指向新节点(对消费者可见)
    prevhead->next_.store(node, std::memory_order_release);
}

关键操作:head_.exchange()

atomic::exchange 是一个不可分割的"读-改-写"操作:它把 head_ 替换为 node,并返回旧值。多个线程同时调用时,每个线程都能拿到"自己之前的那个节点",不会互相踩踏。

2.4 入队过程图解

设 P1、P2 并发插入 A、B(A 先获得 exchange):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
初始:  head_ → [哨兵, next=null] ← tail_

P1 交换后,P1 先行到步骤②之前被抢占:
  head_ → [节点A, next=null]
  tail_ → [哨兵, next=null]    ← 哨兵的 next_ 还未更新!

P2 交换(P1 未完成时):
  prevhead(P2) = 节点A
  head_ → [节点B, next=null]

P2 完成步骤②:
  节点A->next_ = 节点B

P1 恢复完成步骤②:
  哨兵->next_ = 节点A

最终链表:
  tail_ → [哨兵] → [节点A] → [节点B] ← head_
  (消费顺序:A 先,B 后 —— FIFO 保持!)

巧妙之处:即使 P1 在步骤①和②之间被抢占,P2 仍能完成。消费者看到的链表短暂"不完整"(哨兵 next_ 还是 null),只是认为队列为空,会在下次 dequeue 时看到完整数据。单消费者模式完全可以接受这个暂时性"空"。

2.5 出队(dequeue)— 单线程专用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
bool dequeue(T &output)
{
    BufferNode *tail = tail_.load(std::memory_order_relaxed);
    BufferNode *next = tail->next_.load(std::memory_order_acquire);  // 配对 release

    if (next == nullptr) return false;   // 队列空

    output = std::move(*(next->dataPtr_));  // 取出数据
    delete next->dataPtr_;
    tail_.store(next, std::memory_order_release);  // next 成为新哨兵
    delete tail;                                    // 删除旧哨兵
    return true;
}

内存序配对关系

1
2
3
生产者:prevhead->next_.store(node, release)
消费者:tail->next_.load(acquire)
          acquire 确保看到 release 之前的所有写入(即 node->dataPtr_ 的初始化)

2.6 析构清理

1
2
3
4
5
6
7
~MpscQueue()
{
    T output;
    while (this->dequeue(output)) {}   // 清空所有数据
    BufferNode *front = head_.load(std::memory_order_relaxed);
    delete front;                       // 删除最后一个哨兵
}

析构时先把所有元素 dequeue 掉(触发 T 的析构),再删除剩余的哨兵节点(析构时哨兵 = head_)。

2.7 在 trantor 中的实际应用

1
2
// EventLoop.h (trantor/net/EventLoop.h 中可见)
MpscQueue<Func> funcs_;   // ← EventLoop 的任务投递队列

EventLoop::queueInLoop(f) 把 f 入队到 funcs_EventLoop::doRunInLoopFuncs() 在 Loop 线程单消费者地取出执行。

1
2
3
[I/O 工作线程 1] ─── enqueue(task1) ──┐
[I/O 工作线程 2] ─── enqueue(task2) ──┤─→ MpscQueue ─→ [EventLoop 线程] dequeue
[主线程]         ─── enqueue(task3) ──┘               单消费者顺序执行

多个 I/O 线程同时投递任务到同一个 EventLoop,无需 mutex。


三、与传统 mutex 队列的对比

MpscQueue(无锁)std::queue + mutex
生产者并发atomic::exchange,无等待mutex 争抢,可能阻塞
消费者只能单线程可多线程
内存开销每个元素一次 new BufferNode一个 std::queue
延迟极低(无上下文切换)依赖锁竞争情况
ABA 问题无(不做 CAS 比较操作)不适用
短暂"假空"有(入队两步间隙,单消费者无影响)

适用场景:明确的"多生产者、单消费者"模式,如 EventLoop 的任务队列。

不适用场景:多消费者(如 ConcurrentTaskQueue 的工作线程池),必须用 mutex。


四、ObjectPool — shared_ptr 驱动的对象池

4.1 完整实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
template <typename T>
class ObjectPool : public NonCopyable,
                   public std::enable_shared_from_this<ObjectPool<T>>
{
  public:
    std::shared_ptr<T> getObject()
    {
        static_assert(!std::is_pointer<T>::value, "T 不能是指针类型");

        T *p{nullptr};
        {
            std::lock_guard<std::mutex> lock(mtx_);
            if (!objs_.empty()) {
                p = objs_.back();    // 从池中取最后一个
                objs_.pop_back();
            }
        }

        if (p == nullptr)
            p = new T;   // 池空则新建

        std::weak_ptr<ObjectPool<T>> weakPtr = this->shared_from_this();
        auto obj = std::shared_ptr<T>(p, [weakPtr](T *ptr) {
            auto self = weakPtr.lock();
            if (self) {
                std::lock_guard<std::mutex> lock(self->mtx_);
                self->objs_.push_back(ptr);   // 归还到池(不 delete!)
            } else {
                delete ptr;   // 池已销毁,正常释放
            }
        });
        return obj;
    }

  private:
    std::vector<T *> objs_;   // 空闲对象列表(裸指针,池负责生命周期)
    std::mutex mtx_;
};

4.2 核心机制:自定义 deleter

shared_ptr<T> 构造时可以传入第二个参数——自定义 deleter

1
2
3
4
std::shared_ptr<T>(raw_ptr, [](T* p) {
    // 当 shared_ptr 引用计数归零时,执行这里而不是 delete p
    pool.push_back(p);   // 归还对象
});

这是整个对象池最精妙的地方:利用 shared_ptr 的引用计数机制作为"对象使用中"的检测,计数归零时自动触发"归还"而非"销毁"

4.3 为什么用 weak_ptr 而不是 shared_ptr

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// ❌ 错误:捕获 shared_ptr<ObjectPool>
auto obj = shared_ptr<T>(p, [self = shared_from_this()](T* ptr) {
    // shared_ptr 持有 pool,pool 间接持有所有 obj(通过 objs_)
    // 造成循环引用:pool → obj → pool(永不释放!)
});

// ✅ 正确:捕获 weak_ptr<ObjectPool>
auto obj = shared_ptr<T>(p, [weakPtr](T* ptr) {
    auto self = weakPtr.lock();  // 尝试获取 pool
    if (self) { ... }            // pool 存活,归还
    else      { delete ptr; }    // pool 已死,释放
});

weak_ptr 不增加引用计数,所以 pool 的生命周期由用户控制——当用户丢掉 shared_ptr<ObjectPool>,pool 即销毁,已检出的对象在用完后直接 delete 释放。

4.4 使用要求:必须用 shared_ptr 持有 Pool

1
2
3
4
5
6
7
// ❌ 栈上创建:shared_from_this() 会抛出 std::bad_weak_ptr
ObjectPool<Buffer> pool;
auto buf = pool.getObject();   // 崩溃!

// ✅ 必须用 make_shared
auto pool = std::make_shared<ObjectPool<Buffer>>();
auto buf = pool->getObject();  // 正确

enable_shared_from_this 依赖 weak_ptr 控制块,只有通过 shared_ptr 持有对象才能激活控制块。

4.5 对象池的生命周期图

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
用户持有 shared_ptr<ObjectPool>
├─ getObject() 返回 shared_ptr<T>(带归还 deleter)
│       │
│       ├─ 用户使用 T 对象
│       │
│       └─ shared_ptr<T> 析构
│               │
│               ├─ weakPtr.lock() 成功 → objs_.push_back(raw)   ← 归还!
│               └─ weakPtr.lock() 失败 → delete raw              ← 释放
└─ 用户丢掉 shared_ptr<ObjectPool> → Pool 销毁
        (此后所有还在外部的 shared_ptr<T> 析构时走 delete 分支)

五、两个工具的设计模式对比

MpscQueueObjectPool
核心问题多线程投递无锁化减少 new/delete 开销
关键技术atomic::exchange(无锁)shared_ptr 自定义 deleter
线程安全enqueue 无锁,dequeue 单线程getObject + 归还都有 mutex
内存管理每次 enqueue new,dequeue delete对象被池复用,不频繁 new/delete
设计模式无锁数据结构资源池(Resource Pool)
使用约束消费者必须单线程Pool 必须由 shared_ptr 持有

六、游戏服务器实践

6.1 玩家消息队列(MpscQueue 应用)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 场景:多个工作线程(AI 计算、物理模拟)产生结果,发送给主逻辑 EventLoop
class GameLoop {
    MpscQueue<GameEvent> eventQueue_;

    // 工作线程调用(无锁)
    void postEvent(GameEvent evt) {
        eventQueue_.enqueue(std::move(evt));
        loop_.wakeup();   // 唤醒主循环处理
    }

    // 主逻辑线程调用(单消费者)
    void processEvents() {
        GameEvent evt;
        while (eventQueue_.dequeue(evt)) {
            handleEvent(evt);   // 不需要加锁处理 evt
        }
    }
};

6.2 网络消息包对象池(ObjectPool 应用)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 场景:每个客户端连接都要频繁 new/delete 消息包对象
using PacketPool = ObjectPool<GamePacket>;

// 服务器初始化
auto packetPool = std::make_shared<PacketPool>();

// 收到数据时
void onMessage(const TcpConnectionPtr &conn, MsgBuffer *buf) {
    auto packet = packetPool->getObject();  // 从池中取(很快)
    packet->parse(buf);

    // 处理完后 shared_ptr<GamePacket> 析构
    // → 自动归还到 packetPool(不是 delete)
    processPacket(conn, std::move(packet));
}

性能收益:避免每个请求的 malloc/free,减少内存碎片,对 L1/L2 缓存友好(复用已热化的对象内存)。

6.3 结合使用:无锁投递 + 对象复用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 最终形态:工作线程无锁投递消息,消息本身来自对象池
auto pool = std::make_shared<ObjectPool<PlayerAction>>();
MpscQueue<std::shared_ptr<PlayerAction>> actionQueue;

// AI 线程(多个)
void aiThread(int playerId) {
    auto action = pool->getObject();   // 取出复用对象
    action->playerId = playerId;
    action->type = ActionType::Move;
    actionQueue.enqueue(std::move(action));  // 无锁投递
}

// 逻辑主线程
void logicLoop() {
    std::shared_ptr<PlayerAction> action;
    while (actionQueue.dequeue(action)) {
        applyAction(*action);
        // action shared_ptr 析构 → 归还到 pool
    }
}

核心收获

  • MpscQueue 两步入队:① head_.exchange(node, acq_rel) 原子交换(多线程安全)② prevhead->next_.store(node, release) 接通链表(对消费者可见)
  • 两步间隙消费者看到"假空"无影响:单消费者模式下短暂认为队列为空只会少执行一次,下轮可见
  • 内存序配对:生产者 release ↔ 消费者 acquire,确保看到节点数据的完整初始化
  • ObjectPool 核心魔法:shared_ptr 自定义 deleter → 引用计数归零时触发归还而非销毁
  • Pool 必须用 make_shared 持有:enable_shared_from_this 依赖控制块,栈上创建会抛 bad_weak_ptr
  • weak_ptr 持有 Pool 防循环引用:若用 shared_ptr 捕获,pool→obj→pool 形成永不释放的循环

七、思考题

  1. MpscQueueenqueue 分两步:① head_.exchange(node)prevhead->next_.store(node)。在步骤①和②之间,如果消费者调用 dequeue,它会看到什么?这对游戏服务器的事件处理有什么影响?能否接受这种行为?

  2. ObjectPoolstd::vector 存储空闲对象,getObject()back(),归还时 push_back()——即后进先出(LIFO)。相比 FIFO,LIFO 对 CPU 缓存有什么好处?有什么潜在风险(如对象饥饿)?

  3. ObjectPool 没有容量上限——如果某时刻产生了 10000 个对象,之后并发量降低,这 10000 个对象永远躺在 objs_ 里不会释放(内存不会回缩)。如何改造 ObjectPool 支持"水位线":超过 N 个空闲对象时直接 delete 而不归还?

  4. MpscQueue 的每次 enqueue 都有一次 new BufferNodedequeue 一次 delete。能否将 MpscQueueObjectPool 结合,让 BufferNode 本身也来自对象池,从而彻底消除这里的动态内存分配?这样做会遇到什么循环依赖或初始化顺序问题?


八、思考题参考答案

1. enqueue 两步之间消费者 dequeue 会看到什么

场景重现

生产者执行 enqueue(data) 的两步操作:

1
2
3
4
5
6
7
8
9
void enqueue(T &&input)
{
    BufferNode *node{new BufferNode(std::move(input))};
    // ① 原子交换 head_
    BufferNode *prevhead{head_.exchange(node, std::memory_order_acq_rel)};
    // ——— 此刻被抢占,消费者调用 dequeue ———
    // ② 接通链表
    prevhead->next_.store(node, std::memory_order_release);
}

步骤 ① 完成后,② 完成前,队列的状态:

1
2
3
4
tail_ → [哨兵, next_=nullptr]    ← 消费者从这里读
         ...(中间可能已有其他已完成的节点)
prevhead → [某节点, next_=nullptr]   ← 还没被设为指向 node!
head_ → [node, next_=nullptr]       ← 已经是新 head_

此时消费者调用 dequeue

1
2
3
4
5
6
7
8
bool dequeue(T &output)
{
    BufferNode *tail = tail_.load(std::memory_order_relaxed);
    BufferNode *next = tail->next_.load(std::memory_order_acquire);

    if (next == nullptr) return false;   // ← 看到 tail->next_ 为 null
    // ...
}

消费者从 tail_(哨兵)开始,读 tail->next_。如果中间链表尚未被接通(即 prevhead->next_ 还没被 store),消费者看到的 nextnullptr认为队列为空,返回 false

对游戏服务器的影响

这是一种短暂的"假空"现象——队列中确实有数据(node 已经存在于链表结构中),但消费者暂时看不到。

在 EventLoop 的使用场景中:

  • EventLoop::doRunInLoopFuncs() 在一次循环中用 while(dequeue) 取出所有任务
  • 如果某次 dequeue 因为"假空"返回 false,本轮循环结束
  • 但生产者在 enqueue 完成后会调用 wakeup()(通过 eventfd 写入),唤醒 EventLoop
  • EventLoop 被唤醒后再次调用 doRunInLoopFuncs(),此时步骤 ② 早已完成,能看到数据

延迟影响:最坏情况下,一个任务会延迟到下一次 epoll_wait 唤醒后才被处理。对于游戏服务器的帧循环(通常 3060fps,每帧 1633ms),一个任务可能延迟一帧执行。对于大多数游戏逻辑(如玩家移动、道具使用),一帧的延迟完全可以接受。

能否接受:完全可以接受。这是无锁队列的标准折衷——用"短暂假空"换取"生产者完全无锁"。与使用 mutex 导致的线程阻塞(可能持续微秒到毫秒级)相比,“假空"只损失一次循环的时间,且不会导致任何线程阻塞。

2. ObjectPool LIFO 对 CPU 缓存的好处与风险

LIFO(后进先出)对缓存的好处

ObjectPool 使用 vector::push_back() 归还、vector::back() + pop_back() 取出,形成 LIFO 栈。

1
2
3
pool 内部:[obj_oldest, ..., obj_recent]
                                   ↑ back()
getObject() 取 obj_recent

CPU 缓存的核心原理是时间局部性(temporal locality):最近使用过的数据最可能还在缓存中。

LIFO 取出的是最近归还的对象(obj_recent),这个对象刚被使用过,其内存页大概率还在 L1/L2 cache 中。再次使用时命中率高。

对比 FIFO(先进先出):取出的是最早归还的对象(obj_oldest),这个对象可能已经在池中闲置了很久,其内存页可能已被 cache 替换(evicted),访问时需要从主存加载,延迟高出 10~100 倍。

量化:在高频请求场景中(如 HTTP 服务器每秒处理数万请求),LIFO 可以让对象始终"热"在 cache 中,性能差异可观。

潜在风险——对象饥饿(Object Starvation)

LIFO 的问题是:最早归还的对象可能永远不会被取出。

1
2
3
场景:池中有 1000 个对象,并发量稳定在 10
  → 只有最"新"的 10 个对象被反复取出-归还-取出
  → 剩余 990 个对象永远躺在池底,占用内存但从不使用

这导致:

  1. 内存浪费:990 个闲置对象占用的内存本可以释放给系统。
  2. 对象状态陈旧:如果对象持有外部资源(如数据库连接),底部的对象可能因长期闲置而超时断开,取出时需要重新建立连接。
  3. 资源泄漏假象:看似池中有 1000 个对象"可用”,实际只有顶部 10 个是"活跃"的。

改进方案

  • 定时清理"冷对象":开一个后台定时器,定期检查 objs_ 底部的对象,如果闲置时间超过阈值就 delete
  • 设置池容量上限(见第 3 题)。
  • 如果对象持有外部资源,在 getObject() 返回前检查资源有效性(如 ping 数据库连接)。

3. 如何改造 ObjectPool 支持"水位线"

目标:当空闲对象数超过 maxIdle 时,归还的对象直接 delete 而不归还到池中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
template <typename T>
class ObjectPool : public NonCopyable,
                   public std::enable_shared_from_this<ObjectPool<T>>
{
  public:
    explicit ObjectPool(size_t maxIdle = std::numeric_limits<size_t>::max())
        : maxIdle_(maxIdle) {}

    std::shared_ptr<T> getObject()
    {
        static_assert(!std::is_pointer<T>::value, "T can't be pointer type");
        T *p{nullptr};
        {
            std::lock_guard<std::mutex> lock(mtx_);
            if (!objs_.empty()) {
                p = objs_.back();
                objs_.pop_back();
            }
        }
        if (p == nullptr)
            p = new T;

        std::weak_ptr<ObjectPool<T>> weakPtr = this->shared_from_this();
        auto maxIdle = maxIdle_;   // 值拷贝到 lambda,避免捕获 this
        auto obj = std::shared_ptr<T>(p, [weakPtr, maxIdle](T *ptr) {
            auto self = weakPtr.lock();
            if (self) {
                std::lock_guard<std::mutex> lock(self->mtx_);
                if (self->objs_.size() < maxIdle) {
                    self->objs_.push_back(ptr);   // 池未满,归还
                    return;
                }
            }
            delete ptr;   // 池已满 或 池已销毁,直接释放
        });
        return obj;
    }

  private:
    std::vector<T *> objs_;
    std::mutex mtx_;
    size_t maxIdle_;
};

进一步优化——带时间衰减的水位线

简单的固定上限可能不够灵活。可以增加缩容策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 定时调用此方法,回收多余的空闲对象
void shrink(size_t targetSize = 0) {
    std::vector<T*> toDelete;
    {
        std::lock_guard<std::mutex> lock(mtx_);
        while (objs_.size() > targetSize) {
            toDelete.push_back(objs_.back());
            objs_.pop_back();
        }
        objs_.shrink_to_fit();  // 释放 vector 多余的内存
    }
    for (auto p : toDelete)
        delete p;
}

配合游戏服务器的定时器使用:

1
2
3
4
auto pool = std::make_shared<ObjectPool<GamePacket>>(1000);
loop->runEvery(300.0, [pool]() {
    pool->shrink(100);   // 每 5 分钟缩容到 100 个
});

4. MpscQueue + ObjectPool 结合:消除 BufferNode 的动态分配

思路

MpscQueue 每次 enqueuenew BufferNodedequeuedelete BufferNode。如果用 ObjectPool<BufferNode> 来管理节点内存,可以避免频繁的堆分配:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
template <typename T>
class MpscQueuePooled : public NonCopyable {
    struct BufferNode {
        T *dataPtr_;
        std::atomic<BufferNode *> next_{nullptr};
    };

    std::shared_ptr<ObjectPool<BufferNode>> nodePool_;
    // ...

    void enqueue(T &&input) {
        auto nodeShared = nodePool_->getObject();
        BufferNode *node = nodeShared.get();
        node->dataPtr_ = new T(std::move(input));
        node->next_.store(nullptr, std::memory_order_relaxed);

        // 问题:nodeShared 是 shared_ptr,超出作用域就会归还到池!
        // 但 node 还在链表中被使用!
        // 必须"延长"nodeShared 的生命周期到 dequeue 时
    }
};

核心难题:生命周期管理冲突

ObjectPool 返回 shared_ptr<BufferNode>,当 shared_ptr 析构时触发归还。但 MpscQueue 的链表结构要求节点在 enqueue 后继续存活,直到 dequeue 时才释放。

解决方案与问题

方案 A:在 BufferNode 中保存 shared_ptr 自身

1
2
3
4
5
struct BufferNode {
    T *dataPtr_;
    std::atomic<BufferNode *> next_{nullptr};
    std::shared_ptr<BufferNode> selfRef_;  // 保持自己不被归还
};

问题:std::shared_ptr 不是 trivially copyable,atomic 操作不能直接作用在含 shared_ptr 的结构体上。更重要的是,shared_ptr 内部有非原子的控制块操作,在无锁队列的内存序语义下不安全。

方案 B:用 raw pointer + 手动管理的 FreeList 代替 ObjectPool

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template <typename T>
class MpscQueueWithPool : public NonCopyable {
    struct BufferNode {
        T *dataPtr_;
        std::atomic<BufferNode *> next_{nullptr};
    };

    // 用另一个 MpscQueue 作为 FreeList(无锁回收)
    MpscQueue<BufferNode*> freeList_;

    void enqueue(T &&input) {
        BufferNode *node;
        if (!freeList_.dequeue(node)) {
            node = new BufferNode();   // FreeList 空,新建
        }
        node->dataPtr_ = new T(std::move(input));
        node->next_.store(nullptr, std::memory_order_relaxed);
        // ... 正常的入队操作 ...
    }

    bool dequeue(T &output) {
        // ... 正常的出队操作 ...
        // 取出 node 后:
        freeList_.enqueue(node);   // 归还到 FreeList
        // 问题:这又引入了 MpscQueue 的 new/delete!
    }
};

循环依赖:用 MpscQueue 来做 FreeList,FreeList 自身的 enqueue 又需要 new BufferNode——陷入了鸡生蛋蛋生鸡的循环。

方案 C:预分配的环形缓冲区(最实际的方案)

如果真的需要消除所有动态分配,应该放弃链表结构,改用有界无锁环形缓冲区(bounded lock-free ring buffer):

1
2
3
4
5
6
7
template <typename T, size_t Capacity>
class BoundedMpscQueue {
    std::array<std::aligned_storage_t<sizeof(T)>, Capacity> buffer_;
    std::atomic<size_t> head_{0};
    std::atomic<size_t> tail_{0};
    // ...
};

但这改变了 MpscQueue 的语义——变成有界队列,满时 enqueue 会失败。对 EventLoop 的 funcs_ 队列来说,有界限制可能导致任务丢失,不可接受。

结论:在当前的 MpscQueue 链表设计下,很难干净地引入 ObjectPool,因为两者的生命周期管理模型不兼容(shared_ptr 的自动归还 vs 链表节点的手动管理)。实际中,现代内存分配器(如 jemalloc、tcmalloc)的 malloc/free 已经非常快(通过线程本地缓存),BufferNode 的分配开销在大多数场景下不是瓶颈。如果确实是瓶颈,更好的方案是使用自定义的 thread_local FreeList(简单的单链表栈,无需原子操作),在每个线程维护一小批预分配的 BufferNode


学习日期:2025-04-10 | 上一课:第16课_DNS解析 | 下一课:第18课_密码学工具