第 10 课:Acceptor & Connector — 连接的两端

对应源文件:

  • trantor/net/inner/Acceptor.h / Acceptor.cc — 服务端:监听并接受连接
  • trantor/net/inner/Connector.h / Connector.cc — 客户端:主动发起连接(含重连)

一、两个类在架构中的角色

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
                  ┌──────────────────────────┐
                  │        TcpServer         │
                  │  ┌─────────────────────┐ │
                  │  │      Acceptor       │ │
                  │  │  Socket(listenFd)   │ │
                  │  │  Channel            │ │
                  │  └─────────────────────┘ │
                  └──────────────────────────┘
                           ↑ listen
                    客户端发起 connect
                           ↓ accept → 回调 newConnectionCallback_
                  ┌──────────────────────────┐
                  │        TcpClient         │
                  │  ┌─────────────────────┐ │
                  │  │      Connector      │ │
                  │  │  (非阻塞 connect)   │ │
                  │  │  指数退避重连        │ │
                  │  └─────────────────────┘ │
                  └──────────────────────────┘

AcceptorConnector不拥有连接——它们的职责是把一个已就绪的 fd 交给上层(TcpServer/TcpClient),由上层创建 TcpConnection 对象来管理该 fd 的后续生命周期。


二、Acceptor — 服务端连接接收器

2.1 构造:一次性完成 socket 全套配置

1
2
3
4
Acceptor::Acceptor(EventLoop *loop,
                   const InetAddress &addr,
                   bool reUseAddr = true,
                   bool reUsePort = true)

构造函数做了五件事:

1
2
3
4
5
6
7
① idleFd_ = open("/dev/null", O_RDONLY|O_CLOEXEC)   ← 备用 fd(见 EMFILE 处理)
② sock_ = Socket(createNonblockingSocketOrDie(...))   ← 创建非阻塞 listen socket
③ sock_.setReuseAddr(reUseAddr)                       ← 允许端口快速复用
   sock_.setReusePort(reUsePort)
④ sock_.bindAddress(addr_)                            ← 绑定地址
⑤ acceptChannel_(loop, sock_.fd())                   ← 把 listenFd 包装成 Channel
   acceptChannel_.setReadCallback(readCallback)        ← 注册读回调

注意:构造时 listen(),必须单独调用 listen() 方法,原因见下文。

端口 0 的特殊处理

1
2
3
if (addr_.toPort() == 0) {
    addr_ = InetAddress{Socket::getLocalAddr(sock_.fd())};
}

传入端口 0 时操作系统会自动分配可用端口,构造后立刻查询真实分配到的端口,存回 addr_。这样上层可以调用 acceptor.addr().toPort() 得到真实端口号(常用于测试)。

2.2 listen() — 启动监听

1
2
3
4
5
6
7
8
void Acceptor::listen()
{
    loop_->assertInLoopThread();            // 必须在 EventLoop 线程调用
    if (beforeListenSetSockOptCallback_)
        beforeListenSetSockOptCallback_(sock_.fd());  // ① 监听前的 socket 选项钩子
    sock_.listen();                          // ② 系统调用 listen(fd, SOMAXCONN)
    acceptChannel_.enableReading();          // ③ 注册到 epoll,开始接受连接事件
}

为什么 listen 和构造分开?

构造时可能需要先配置回调(setNewConnectionCallback),或在多线程场景下先完成对象初始化再激活监听。分离构造与激活是一种常见的"惰性初始化"模式。

beforeListenSetSockOptCallback_ 是一个钩子,允许用户在 listen() 之前对 socket 做自定义配置(例如设置 TCP_DEFER_ACCEPT),不需要修改框架代码。

2.3 readCallback() — 核心 accept 逻辑

listenFd 上有事件(新连接到达),Channel 调用 readCallback()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
void Acceptor::readCallback()
{
    InetAddress peer;
    int newsock = sock_.accept(&peer);      // accept4() on Linux

    if (newsock >= 0) {
        if (afterAcceptSetSockOptCallback_)
            afterAcceptSetSockOptCallback_(newsock);  // ① accept 后 socket 选项钩子
        if (newConnectionCallback_)
            newConnectionCallback_(newsock, peer);    // ② 把 fd 和对端地址交给上层
        else {
            close(newsock);  // 没有注册回调 → 直接关掉(防止 fd 泄漏)
        }
    }
    else {
        // accept 失败的特殊处理(见下文 EMFILE 技巧)
        if (errno == EMFILE) { ... }
    }
}

2.4 EMFILE 问题与 idleFd_ 技巧

EMFILEToo many open files——进程的 fd 数量已到达上限,accept() 无法为新连接分配 fd。

问题:即使 accept 失败,listenFd 上仍然有事件(新连接在 listen 队列中),epoll 会持续触发,形成空转 busy loop,CPU 飙升到 100%。

trantor 的解决方案

1
2
3
4
5
6
7
8
// 构造时预先占一个 fd
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);

// EMFILE 时:
::close(idleFd_);                   // ① 释放备用 fd,让系统有一个空闲 fd
idleFd_ = sock_.accept(&peer);      // ② 用这个空闲 fd accept,把连接从队列里取出
::close(idleFd_);                   // ③ 立刻关闭(优雅拒绝)
idleFd_ = ::open("/dev/null", ...); // ④ 重新占住备用 fd

精妙之处

  • 不是忽略错误(那会导致 busy loop)
  • 而是礼貌地拒绝:accept 后立刻 close,让客户端收到 RST,而不是被挂起
  • idleFd_ 始终是一个打开的 /dev/null fd,作为"保险箱",在 fd 枯竭时提供一次应急额度

为什么 Windows 不需要 idleFd_?

Windows 使用句柄而非 Unix fd,EMFILE 语义不同,wepoll 也有不同的处理机制,所以用 #ifndef _WIN32idleFd_ 编译掉。

2.5 完整时序图

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[TcpServer::start()]
acceptor_.listen()
  → sock_.listen()
  → acceptChannel_.enableReading()
  → epoll_ctl(ADD, listenFd, EPOLLIN)
   [客户端 connect()]
epoll_wait() 返回 listenFd 可读
acceptChannel_.handleEvent()
  → Acceptor::readCallback()
  → sock_.accept(&peer)           ← accept4(NONBLOCK|CLOEXEC) on Linux
  → afterAcceptSetSockOptCallback_(newsock)  ← 可设 TCP_NODELAY 等
  → newConnectionCallback_(newsock, peer)
TcpServer::newConnection(newsock, peer)
  → 创建 TcpConnectionImpl
  → 分配到某个 I/O EventLoop
  → 连接开始运行

三、Connector — 客户端主动连接器

3.1 核心难题:非阻塞 connect

普通阻塞 connect() 会一直等到连接建立或超时。非阻塞模式下,connect() 会立即返回,需要通过 epoll 的写事件来感知连接结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
非阻塞 connect() 返回 EINPROGRESS
注册 EPOLLOUT(写事件)到 epoll
   等待 epoll_wait 返回
写事件就绪(无论成功还是失败都触发)
getsockopt(SO_ERROR) 判断实际结果
     ├─ err == 0 → 连接成功
     └─ err != 0 → 连接失败

为什么用写事件而不是读事件?

connect() 完成(无论成功失败)后,socket 进入"可写"状态。读事件在连接成功且对端发数据时才触发,不能用来检测 connect 结果。

3.2 状态机

1
2
3
4
5
enum class Status {
    Disconnected,   // 初始状态 / 重试中 / 已停止
    Connecting,     // connect() 已调用,等待写事件
    Connected       // 连接成功,fd 已交给上层
};
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
[Disconnected]
     │ start()
  connect() → EINPROGRESS
     │ connecting(fd)
[Connecting]
     │ epoll 写事件触发
  handleWrite()
     ├─ SO_ERROR == 0 && !isSelfConnect → [Connected]
     │                                    newConnectionCallback_(fd)
     ├─ SO_ERROR != 0 && retry_         → retry() → [Disconnected]
     │                                    runAfter(delay, startInLoop)
     └─ 已 stop()                        → [Disconnected]
                                           close(fd)

3.3 connect() — errno 分类处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void Connector::connect()
{
    fd_ = Socket::createNonblockingSocketOrDie(serverAddr_.family());
    if (sockOptCallback_) sockOptCallback_(fd_);   // 连接前 socket 选项钩子
    errno = 0;
    int ret = Socket::connect(fd_, serverAddr_);
    int savedErrno = (ret == 0) ? 0 : errno;

    switch (savedErrno)
    {
        // ── 正常的"连接进行中"状态 ──
        case 0:           // 极少见:本机连本机瞬间成功
        case EINPROGRESS: // 标准情况:连接正在进行,等写事件
        case EINTR:       // 被信号打断,重新等
        case EISCONN:     // 已经连上了(重入安全)
            connecting(fd_);     // → 注册写事件,等结果
            break;

        // ── 可重试的暂时性错误 ──
        case EAGAIN:        // 本地端口不足
        case EADDRINUSE:    // 目标端口占用
        case EADDRNOTAVAIL: // 地址不可用
        case ECONNREFUSED:  // 对端拒绝(RST)
        case ENETUNREACH:   // 网络不可达
            if (retry_) retry(fd_);
            break;

        // ── 不可恢复的致命错误 ──
        case EACCES:      // 权限不足(如连接特权端口)
        case EPERM:
        case EAFNOSUPPORT:// 地址族不支持
        case EALREADY:    // 已有连接请求未完成
        case EBADF:       // fd 无效
        case EFAULT:      // 地址指针无效
        case ENOTSOCK:    // fd 不是 socket
            close(fd_);
            errorCallback_();  // 通知上层,不重试
            break;
    }
}

错误分类的意义

  • 可重试错误(ECONNREFUSED 等):对端可能暂时不在线,等一会儿重试有意义
  • 致命错误(EBADF 等):程序逻辑问题,重试没有意义,直接报错

3.4 handleWrite() — 连接结果检测

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void Connector::handleWrite()
{
    if (status_ == Status::Connecting) {
        int sockfd = removeAndResetChannel();     // 取消写事件监听
        int err = Socket::getSocketError(sockfd); // getsockopt(SO_ERROR)

        if (err) {
            // 连接失败:可重试就重试,否则关闭并报错
            if (retry_) retry(sockfd);
            else { close(sockfd); }
            errorCallback_();
        }
        else if (Socket::isSelfConnect(sockfd)) {
            // 自连接检测(见第 9 课)
            if (retry_) retry(sockfd);
            errorCallback_();
        }
        else {
            // 连接成功!
            status_ = Status::Connected;
            if (connect_) {
                newConnectionCallback_(sockfd);  // 把 fd 交给上层
            }
            // 如果已经 stop(),close(sockfd)
        }
    }
}

removeAndResetChannel() 的延迟析构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
int Connector::removeAndResetChannel()
{
    channelPtr_->disableAll();
    channelPtr_->remove();
    int sockfd = channelPtr_->fd();
    // 关键:不能在这里直接 reset(),因为此时正在 Channel::handleEvent() 调用栈中!
    // 用 queueInLoop 延迟到下一次循环再析构
    loop_->queueInLoop([channelPtr = channelPtr_]() {});
    channelPtr_.reset();
    return sockfd;
}

这是一个精妙的"延迟析构":如果在 Channel 的 handleEvent() 调用链中直接 reset() Channel,会析构正在执行的对象,触发栈上悬垂引用。queueInLoop 捕获一份 shared_ptr,让 Channel 在当前调用链结束后(下一次 loop 迭代)再析构。

3.5 retry() — 指数退避重连

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void Connector::retry(int sockfd)
{
    ::close(sockfd);               // 先关掉失败的 socket
    status_ = Status::Disconnected;
    if (connect_) {
        LOG_INFO << "Retry in " << retryInterval_ << "ms";
        loop_->runAfter(retryInterval_ / 1000.0,
                        std::bind(&Connector::startInLoop, shared_from_this()));
        retryInterval_ = retryInterval_ * 2;           // 每次翻倍
        if (retryInterval_ > maxRetryInterval_)
            retryInterval_ = maxRetryInterval_;         // 上限 30 秒
    }
}

指数退避策略

1
2
3
4
5
6
第 1 次重试:500ms 后
第 2 次重试:1000ms 后
第 3 次重试:2000ms 后
第 4 次重试:4000ms 后
...
第 n 次重试:30000ms 后(上限,不再增长)

为什么指数退避?

如果服务器宕机,大量客户端同时以固定间隔重连,会形成重连风暴,服务器刚恢复就被打垮。指数退避让重连请求自然分散,对服务器更友好。

shared_from_this() 的必要性

retry()runAfterstartInLoop 延迟执行。在延迟执行之前,TcpClient 可能已经析构了 Connector。用 shared_from_this() 让定时器持有一份引用计数,保证 Connector 存活到定时器触发后。这就是为什么 Connector 继承了 enable_shared_from_this

3.6 stop() 的线程安全

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
void Connector::stop()
{
    status_ = Status::Disconnected;
    if (loop_->isInLoopThread()) {
        removeAndResetChannel();
    } else {
        // 跨线程调用:投递到 EventLoop 线程执行
        loop_->queueInLoop([thisPtr = shared_from_this()]() {
            thisPtr->removeAndResetChannel();
        });
    }
}

stop() 可以从任意线程调用(如用户线程触发断线),而 Channel 操作必须在 EventLoop 线程执行,因此用 queueInLoop 跨线程投递。


四、两个类的对比

特性AcceptorConnector
角色服务端,被动接受客户端,主动发起
关键系统调用listen() + accept()connect()
epoll 事件读事件(listenFd 可读=有新连接)写事件(连接完成触发)
结果判断accept4() 返回 >= 0getsockopt(SO_ERROR)
错误处理EMFILE 用 idleFd_ 优雅拒绝errno 分类,可重试错误指数退避
生命周期在 TcpServer 整个运行期存在每次连接新建,成功后把 fd 交出
线程安全listen/accept 在同一 EventLoop 线程stop() 跨线程安全,内部 queueInLoop
自连接检测不需要Socket::isSelfConnect()

五、socket 选项钩子机制

两个类都提供了 socket 选项钩子,给用户不改框架源码就能定制 socket 行为的能力:

1
2
3
4
5
6
Acceptor:
  beforeListenSetSockOptCallback_(fd)  ← listen() 之前调用
  afterAcceptSetSockOptCallback_(fd)   ← 每次 accept() 成功后调用

Connector:
  sockOptCallback_(fd)                 ← connect() 之前调用

典型使用场景

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 服务器:对每个新连接设置 TCP_NODELAY
acceptor.setAfterAcceptSockOptCallback([](int fd) {
    int on = 1;
    setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
});

// 客户端:绑定特定本地端口(如需要端口白名单穿透防火墙)
connector.setSockOptCallback([](int fd) {
    struct sockaddr_in local;
    local.sin_port = htons(12345);  // 固定本地端口
    bind(fd, (sockaddr*)&local, sizeof(local));
});

六、游戏服务器实践

6.1 服务端场景(Acceptor)

1
2
3
4
5
6
7
8
9
// 典型的游戏网关服配置
TcpServer gatewayServer(&loop, InetAddress(9000), "Gateway");
// 内部:TcpServer 持有 Acceptor,在 start() 时调用 acceptor.listen()

// 对每个新连接:关 Nagle(低延迟),设置接收缓冲区
gatewayServer.setBeforeListenSockOptCallback([](int fd) {
    int size = 4 * 1024 * 1024;  // 4MB 接收缓冲区
    setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
});

6.2 客户端场景(Connector)

1
2
3
4
5
6
7
8
9
// 游戏服连接数据库服(内网连接,需要断线重连)
TcpClient dbClient(&loop, InetAddress("10.0.0.100", 3306), "DBClient");
dbClient.enableRetry();  // 启用指数退避重连
// 内部:TcpClient 持有 Connector,断线后 Connector::retry() 自动重连

// 重连时的日志输出:
// INFO  Retry connecting to 10.0.0.100:3306 in 500 milliseconds.
// INFO  Retry connecting to 10.0.0.100:3306 in 1000 milliseconds.
// INFO  Retry connecting to 10.0.0.100:3306 in 2000 milliseconds.

核心收获

  • idleFd_ EMFILE 技巧:预占 /dev/null fd,fd 耗尽时临时借用→接受→立即关闭→重新打开,让客户端收到干净 RST 而非无限等待
  • 非阻塞 connect() 返回 EINPROGRESS 是正常的:注册写事件,触发后用 getsockopt(SO_ERROR) 检查真实结果
  • errno 要分类:EINTR/EAGAIN/EADDRINUSE 等可重试;ECONNREFUSED/ENETUNREACH 等是致命错误
  • 指数退避:500ms→1s→2s→…→30s 封顶,shared_from_this() 在 timer 期间保活 Connector
  • removeAndResetChannel()queueInLoop 延迟销毁 Channel,防止在 Channel 自身的回调中析构

七、思考题

  1. AcceptoridleFd_ 在 EMFILE 时先 close 再 accept 再 close。这个操作窗口内(第一次 close 到最后一次 open 之间),如果又有新连接到来,readCallback 还会被再次调用吗?会不会出现问题?

  2. Connector::handleWrite()getsockopt(SO_ERROR) 判断 connect 结果,而不是直接用 connect() 的返回值。为什么 connect() 返回 EINPROGRESS 就判断成功是错误的?有什么场景下连接会在写事件触发时才真正失败?

  3. Connector 继承了 enable_shared_from_this,而 Acceptor 没有。为什么 Connector 需要、Acceptor 不需要?(提示:分析两者的 retry 行为和 TcpServer/TcpClient 对它们的持有方式)

  4. 指数退避的上限是 30 秒,但 retryInterval_ 在重连成功后不会重置为初始值(500ms)。如果连接建立 → 短暂断线 → 重连时使用的是 30 秒间隔,这合理吗?如何改进?


八、思考题参考答案

1. idleFd_ 操作窗口内新连接到来的行为分析

源码位置Acceptor.cc 第 92-98 行

1
2
3
4
5
6
7
if (errno == EMFILE)
{
    ::close(idleFd_);                        // ① 释放备用 fd
    idleFd_ = sock_.accept(&peer);           // ② accept 取出一个连接
    ::close(idleFd_);                        // ③ 立刻关闭(拒绝)
    idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);  // ④ 重新占住
}

窗口期分析

整个操作发生在单个 EventLoop 线程中,而 Acceptor 的 readCallback() 是由 Channel 的事件处理函数调用的。在 ① 到 ④ 的执行期间,不会被另一次 readCallback() 中断(EventLoop 是单线程事件驱动模型,不存在同一 Channel 的 readCallback 重入)。

但问题是:在 ① close(idleFd_) 之后、④ open("/dev/null", ...) 之前,进程有了 1-2 个空闲 fd 槽位。如果操作系统恰好在这个极短窗口内完成了一个新的 TCP 三次握手(这在内核层面是异步的),新连接会被放入 listen socket 的已完成连接队列(accept queue)中。

回答

  1. readCallback 不会在窗口期内被再次调用:因为 EventLoop 是单线程的,当前正在执行 readCallback,epoll_wait 不会在此时被调用,所以不会触发新的 readCallback

  2. 新连接会在 accept queue 中等待:窗口期内到达的新连接在内核的 accept queue 中排队,不受影响

  3. readCallback 返回后,EventLoop 重新 epoll_wait

    • 如果 accept queue 中还有未取走的连接,listenFd 仍然会被报告为可读
    • 下一次 readCallback 调用 sock_.accept() 时,如果 fd 配额仍然不够(EMFILE),又会重复 idleFd_ 技巧
    • 如果中间有其他连接关闭释放了 fd,accept 就能成功
  4. 潜在问题:这个方案每次只能拒绝一个等待中的连接。如果 accept queue 中积压了大量连接(比如突发的连接风暴),每次 readCallback 只拒绝一个,其他连接继续在 queue 中等待。但由于 epoll 是 LT(水平触发)模式,只要 queue 不为空就会持续触发 readCallback,所以最终所有积压连接都会被逐一拒绝,不会永久堆积

  5. 还有一个微妙的竞态:在 ② accept 和 ③ close 之间,进程的 fd 数量短暂回到了上限。如果此时有其他线程尝试 open/socket/accept,也会得到 EMFILE。但 Acceptor 是在 EventLoop 线程中运行的,通常 fd 操作也在同一线程,所以这个竞态的实际影响很小

2. 为什么用 getsockopt(SO_ERROR) 而不是 connect() 返回值

源码位置Connector.cc 第 172-245 行

1
2
3
4
5
6
7
void Connector::handleWrite()
{
    // ...
    int err = Socket::getSocketError(sockfd);  // getsockopt(SO_ERROR)
    if (err) { /* 失败 */ }
    else { /* 成功 */ }
}

根本原因:非阻塞 connect() 的返回值和最终连接结果是两码事

时间线拆解

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
t=0   connect() 调用
      → 返回 -1, errno=EINPROGRESS
      含义:TCP SYN 已发出,三次握手正在进行中
      此时你只知道"请求已发出",不知道结果

t=0~N  [等待三次握手完成]
       可能发生:
       a) SYN-ACK 收到 → 回 ACK → 连接建立
       b) RST 收到 → 连接被拒绝
       c) ICMP 不可达收到 → 网络不可达
       d) 超时 → 无响应

t=N   epoll 报告 EPOLLOUT(写事件就绪)
      → handleWrite() 被调用

为什么不能用 connect() 的返回值?

  1. connect() 返回 EINPROGRESS 只表示"SYN 已发送",不是成功也不是失败
  2. connect() 在返回后就已经"完成"了(从系统调用的角度),后续的 TCP 握手结果不会再通过 connect() 返回
  3. 在某些系统上对同一个 socket 再次调用 connect() 可能返回 EISCONN(已连接)或 EALREADY(操作已在进行中),但行为不可移植

为什么 EPOLLOUT 不能直接当成功?

写事件就绪只表示"connect 过程结束了",不区分成功还是失败:

  • 连接成功:socket 变为可写(有发送缓冲区可用)
  • 连接失败:socket 也会变为可写(kernel 标记为可写 + error),同时 EPOLLERR 也会被设置

必须通过 getsockopt(SOL_SOCKET, SO_ERROR, ...) 读取 socket 的待处理错误码来判断真实结果:

  • err == 0:连接成功
  • err == ECONNREFUSED:对端拒绝(RST)
  • err == ETIMEDOUT:连接超时
  • err == ENETUNREACH:网络不可达

实际失败场景举例

场景 1 - 对端端口未监听:

1
2
3
4
5
client → SYN → server:9999(无人监听)
server → RST → client
client 的 socket SO_ERROR = ECONNREFUSED
epoll 报告 EPOLLOUT | EPOLLERR
handleWrite() 中 getsockopt 读到 ECONNREFUSED → 重试

场景 2 - 网络不可达:

1
2
3
4
client → SYN → 路由器 → 目标网络不存在
路由器 → ICMP Destination Unreachable → client
client 的 socket SO_ERROR = ENETUNREACH
epoll 报告 EPOLLOUT | EPOLLERR

场景 3 - 防火墙静默丢包:

1
2
3
4
client → SYN → 防火墙(丢弃,不回复)
...等待很久...
client 内核 TCP 重传 SYN 多次后放弃
SO_ERROR = ETIMEDOUT

3. Connector 需要 enable_shared_from_this 而 Acceptor 不需要的原因

源码位置

  • Connector.h 第 25-26 行:class Connector : public std::enable_shared_from_this<Connector>
  • Acceptor.h 第 28 行:class Acceptor : NonCopyable(不继承 enable_shared_from_this

Connector 需要的原因——延迟回调的生命周期保障

Connector 在两个关键位置使用了 shared_from_this()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Connector.cc 第 289-290 行 — retry() 中
loop_->runAfter(retryInterval_ / 1000.0,
                std::bind(&Connector::startInLoop, shared_from_this()));

// Connector.cc 第 59 行 — stop() 中
loop_->queueInLoop([thisPtr = shared_from_this()]() {
    thisPtr->removeAndResetChannel();
});

// Connector.cc 第 147-151 行 — connecting() 中
channelPtr_->setWriteCallback(
    std::bind(&Connector::handleWrite, shared_from_this()));
channelPtr_->setErrorCallback(
    std::bind(&Connector::handleError, shared_from_this()));

核心问题是:Connector 会注册延迟执行的回调(定时器回调、Channel 回调),这些回调在未来某个时刻执行时,Connector 对象必须仍然存活。

具体场景:

  1. TcpClient 持有 shared_ptr<Connector>
  2. Connector 调用 retry(),注册一个 500ms 后的定时器回调
  3. 在这 500ms 内,用户调用 TcpClient::disconnect(),TcpClient 析构,释放了 shared_ptr<Connector>
  4. 如果 retry() 中用的是裸 this,Connector 已被销毁,500ms 后定时器触发时访问的是野指针 → 段错误
  5. shared_from_this() 让定时器的 lambda 也持有一份 shared_ptr,引用计数不归零,Connector 安全存活到回调执行完毕

Acceptor 不需要的原因——生命周期与 TcpServer 完全绑定

  1. Acceptor 没有 retry 机制:Acceptor 是服务端监听,一旦创建就持续运行到 TcpServer 析构。它不需要延迟重连,也没有定时器回调
  2. Acceptor 由 TcpServer 直接持有(作为成员变量或 unique_ptr),生命周期明确:TcpServer 构造时创建,析构时销毁
  3. Acceptor 的 Channel 回调(readCallback)是 std::bind(&Acceptor::readCallback, this),直接用裸 this。这是安全的,因为 Acceptor 析构时会调用 acceptChannel_.disableAll()acceptChannel_.remove()(见 Acceptor.cc 第 47-49 行),确保析构后不会有任何回调触发
  4. Acceptor 没有跨生命周期的异步操作:它的所有操作都是同步的(在 EventLoop 线程中立即执行),不存在"回调到来时对象已销毁"的风险

对比总结

特性AcceptorConnector
持有方式TcpServer 直接持有(唯一所有者)shared_ptr<Connector>(可能被多处引用)
是否有定时器回调有(retry 指数退避)
Channel 回调中用的 thisthis(安全,因为析构时取消了 Channel)shared_from_this()(因为 Channel 可能比 TcpClient 活得久)
析构保护disableAll() + remove() 在析构函数中shared_ptr 引用计数保护

4. retryInterval_ 不重置的合理性分析与改进

源码位置Connector.h 第 86-87 行 和 Connector.cc 第 275-299 行

1
2
3
4
5
6
7
8
// Connector.h
int retryInterval_{kInitRetryDelayMs};     // 初始 500ms
int maxRetryInterval_{kMaxRetryDelayMs};    // 上限 30000ms

// Connector.cc — retry()
retryInterval_ = retryInterval_ * 2;
if (retryInterval_ > maxRetryInterval_)
    retryInterval_ = maxRetryInterval_;

查看完整的 Connector 代码可以确认:没有任何地方在连接成功后将 retryInterval_ 重置为 kInitRetryDelayMs(500ms)handleWrite() 中连接成功时只是设置 status_ = Connected 并调用 newConnectionCallback_,没有重置 retryInterval_

这不太合理,原因如下:

问题场景

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
t=0     首次连接失败,retryInterval_=500ms
t=0.5   重试,失败,retryInterval_=1000ms
t=1.5   重试,失败,retryInterval_=2000ms
...经过多次重试...
t=60    重试,成功!retryInterval_ 已经增长到 30000ms(上限)
        连接运行正常...
t=120   服务器短暂重启(2秒就恢复)
t=120   连接断开,TcpClient 调用 Connector::start() 发起重连
t=120   connect() 失败(服务器还没起来),retry(fd)
        → 使用 retryInterval_=30000ms → 等 30 秒后才重试!
        但服务器 2 秒后就恢复了,白白等了 28 秒

这对游戏服务器来说是不可接受的——数据库连接断了 2 秒后就恢复,但游戏服需要 30 秒才能重连上。

为什么 trantor 没有重置?可能的考量

  1. 简单性:当前 Connector 的实现比较简洁,每次 retry() 只管递增间隔,没有"成功后重置"的逻辑。TcpClient 在连接断开后会创建新的 Connector 对象,新对象的 retryInterval_ 是初始值 500ms。所以这个问题可能在实际使用中不太明显——取决于 TcpClient 是复用 Connector 还是新建
  2. 防抖动:如果连接频繁断开/重连(flapping),不重置间隔可以起到"惩罚"效果,避免频繁重连给服务器造成压力

改进方案

方案 A — 连接成功后重置(最直观):

1
2
3
4
5
6
// 在 handleWrite() 连接成功的分支中添加
status_ = Status::Connected;
retryInterval_ = kInitRetryDelayMs;  // 重置为 500ms
if (connect_) {
    newConnectionCallback_(sockfd);
}

方案 B — 部分重置(带保护):

1
2
// 连接成功后不完全重置为 500ms,而是减半(但不低于初始值)
retryInterval_ = std::max(kInitRetryDelayMs, retryInterval_ / 2);

这样如果连接不稳定(频繁断重连),间隔不会每次都从 500ms 开始爬升,有一定的"记忆"效果。

方案 C — 由 TcpClient 层面处理: 每次断线后创建全新的 Connector 对象(retryInterval_ 自然是初始值),旧 Connector 对象在没有活跃定时器后自然析构。这是 muduo/trantor 的一些使用方式中实际采用的方案,但代价是每次断线重连都要重新创建对象。

推荐方案 A,在 handleWrite() 的成功分支中简单添加一行 retryInterval_ = kInitRetryDelayMs 即可,语义清晰,对游戏服务器的快速重连场景最友好。


学习日期:2025-03-21 | 上一课:第09课_网络地址与Socket封装 | 下一课:第11课_TcpConnection连接生命周期