第 10 课:Acceptor & Connector — 连接的两端#
对应源文件:
trantor/net/inner/Acceptor.h / Acceptor.cc — 服务端:监听并接受连接trantor/net/inner/Connector.h / Connector.cc — 客户端:主动发起连接(含重连)
一、两个类在架构中的角色#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| ┌──────────────────────────┐
│ TcpServer │
│ ┌─────────────────────┐ │
│ │ Acceptor │ │
│ │ Socket(listenFd) │ │
│ │ Channel │ │
│ └─────────────────────┘ │
└──────────────────────────┘
↑ listen
客户端发起 connect
↓ accept → 回调 newConnectionCallback_
┌──────────────────────────┐
│ TcpClient │
│ ┌─────────────────────┐ │
│ │ Connector │ │
│ │ (非阻塞 connect) │ │
│ │ 指数退避重连 │ │
│ └─────────────────────┘ │
└──────────────────────────┘
|
Acceptor 和 Connector 都不拥有连接——它们的职责是把一个已就绪的 fd 交给上层(TcpServer/TcpClient),由上层创建 TcpConnection 对象来管理该 fd 的后续生命周期。
二、Acceptor — 服务端连接接收器#
2.1 构造:一次性完成 socket 全套配置#
1
2
3
4
| Acceptor::Acceptor(EventLoop *loop,
const InetAddress &addr,
bool reUseAddr = true,
bool reUsePort = true)
|
构造函数做了五件事:
1
2
3
4
5
6
7
| ① idleFd_ = open("/dev/null", O_RDONLY|O_CLOEXEC) ← 备用 fd(见 EMFILE 处理)
② sock_ = Socket(createNonblockingSocketOrDie(...)) ← 创建非阻塞 listen socket
③ sock_.setReuseAddr(reUseAddr) ← 允许端口快速复用
sock_.setReusePort(reUsePort)
④ sock_.bindAddress(addr_) ← 绑定地址
⑤ acceptChannel_(loop, sock_.fd()) ← 把 listenFd 包装成 Channel
acceptChannel_.setReadCallback(readCallback) ← 注册读回调
|
注意:构造时不 listen(),必须单独调用 listen() 方法,原因见下文。
端口 0 的特殊处理#
1
2
3
| if (addr_.toPort() == 0) {
addr_ = InetAddress{Socket::getLocalAddr(sock_.fd())};
}
|
传入端口 0 时操作系统会自动分配可用端口,构造后立刻查询真实分配到的端口,存回 addr_。这样上层可以调用 acceptor.addr().toPort() 得到真实端口号(常用于测试)。
2.2 listen() — 启动监听#
1
2
3
4
5
6
7
8
| void Acceptor::listen()
{
loop_->assertInLoopThread(); // 必须在 EventLoop 线程调用
if (beforeListenSetSockOptCallback_)
beforeListenSetSockOptCallback_(sock_.fd()); // ① 监听前的 socket 选项钩子
sock_.listen(); // ② 系统调用 listen(fd, SOMAXCONN)
acceptChannel_.enableReading(); // ③ 注册到 epoll,开始接受连接事件
}
|
为什么 listen 和构造分开?
构造时可能需要先配置回调(setNewConnectionCallback),或在多线程场景下先完成对象初始化再激活监听。分离构造与激活是一种常见的"惰性初始化"模式。
beforeListenSetSockOptCallback_ 是一个钩子,允许用户在 listen() 之前对 socket 做自定义配置(例如设置 TCP_DEFER_ACCEPT),不需要修改框架代码。
2.3 readCallback() — 核心 accept 逻辑#
当 listenFd 上有事件(新连接到达),Channel 调用 readCallback():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| void Acceptor::readCallback()
{
InetAddress peer;
int newsock = sock_.accept(&peer); // accept4() on Linux
if (newsock >= 0) {
if (afterAcceptSetSockOptCallback_)
afterAcceptSetSockOptCallback_(newsock); // ① accept 后 socket 选项钩子
if (newConnectionCallback_)
newConnectionCallback_(newsock, peer); // ② 把 fd 和对端地址交给上层
else {
close(newsock); // 没有注册回调 → 直接关掉(防止 fd 泄漏)
}
}
else {
// accept 失败的特殊处理(见下文 EMFILE 技巧)
if (errno == EMFILE) { ... }
}
}
|
2.4 EMFILE 问题与 idleFd_ 技巧#
EMFILE:Too many open files——进程的 fd 数量已到达上限,accept() 无法为新连接分配 fd。
问题:即使 accept 失败,listenFd 上仍然有事件(新连接在 listen 队列中),epoll 会持续触发,形成空转 busy loop,CPU 飙升到 100%。
trantor 的解决方案:
1
2
3
4
5
6
7
8
| // 构造时预先占一个 fd
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
// EMFILE 时:
::close(idleFd_); // ① 释放备用 fd,让系统有一个空闲 fd
idleFd_ = sock_.accept(&peer); // ② 用这个空闲 fd accept,把连接从队列里取出
::close(idleFd_); // ③ 立刻关闭(优雅拒绝)
idleFd_ = ::open("/dev/null", ...); // ④ 重新占住备用 fd
|
精妙之处:
- 不是忽略错误(那会导致 busy loop)
- 而是礼貌地拒绝:accept 后立刻 close,让客户端收到 RST,而不是被挂起
idleFd_ 始终是一个打开的 /dev/null fd,作为"保险箱",在 fd 枯竭时提供一次应急额度
为什么 Windows 不需要 idleFd_?
Windows 使用句柄而非 Unix fd,EMFILE 语义不同,wepoll 也有不同的处理机制,所以用 #ifndef _WIN32 把 idleFd_ 编译掉。
2.5 完整时序图#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| [TcpServer::start()]
│
▼
acceptor_.listen()
→ sock_.listen()
→ acceptChannel_.enableReading()
→ epoll_ctl(ADD, listenFd, EPOLLIN)
│
[客户端 connect()]
│
▼
epoll_wait() 返回 listenFd 可读
│
▼
acceptChannel_.handleEvent()
→ Acceptor::readCallback()
→ sock_.accept(&peer) ← accept4(NONBLOCK|CLOEXEC) on Linux
→ afterAcceptSetSockOptCallback_(newsock) ← 可设 TCP_NODELAY 等
→ newConnectionCallback_(newsock, peer)
│
▼
TcpServer::newConnection(newsock, peer)
→ 创建 TcpConnectionImpl
→ 分配到某个 I/O EventLoop
→ 连接开始运行
|
三、Connector — 客户端主动连接器#
3.1 核心难题:非阻塞 connect#
普通阻塞 connect() 会一直等到连接建立或超时。非阻塞模式下,connect() 会立即返回,需要通过 epoll 的写事件来感知连接结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| 非阻塞 connect() 返回 EINPROGRESS
│
▼
注册 EPOLLOUT(写事件)到 epoll
│
等待 epoll_wait 返回
│
▼
写事件就绪(无论成功还是失败都触发)
│
▼
getsockopt(SO_ERROR) 判断实际结果
├─ err == 0 → 连接成功
└─ err != 0 → 连接失败
|
为什么用写事件而不是读事件?
connect() 完成(无论成功失败)后,socket 进入"可写"状态。读事件在连接成功且对端发数据时才触发,不能用来检测 connect 结果。
3.2 状态机#
1
2
3
4
5
| enum class Status {
Disconnected, // 初始状态 / 重试中 / 已停止
Connecting, // connect() 已调用,等待写事件
Connected // 连接成功,fd 已交给上层
};
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| [Disconnected]
│ start()
▼
connect() → EINPROGRESS
│ connecting(fd)
▼
[Connecting]
│ epoll 写事件触发
▼
handleWrite()
├─ SO_ERROR == 0 && !isSelfConnect → [Connected]
│ newConnectionCallback_(fd)
├─ SO_ERROR != 0 && retry_ → retry() → [Disconnected]
│ runAfter(delay, startInLoop)
└─ 已 stop() → [Disconnected]
close(fd)
|
3.3 connect() — errno 分类处理#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| void Connector::connect()
{
fd_ = Socket::createNonblockingSocketOrDie(serverAddr_.family());
if (sockOptCallback_) sockOptCallback_(fd_); // 连接前 socket 选项钩子
errno = 0;
int ret = Socket::connect(fd_, serverAddr_);
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
// ── 正常的"连接进行中"状态 ──
case 0: // 极少见:本机连本机瞬间成功
case EINPROGRESS: // 标准情况:连接正在进行,等写事件
case EINTR: // 被信号打断,重新等
case EISCONN: // 已经连上了(重入安全)
connecting(fd_); // → 注册写事件,等结果
break;
// ── 可重试的暂时性错误 ──
case EAGAIN: // 本地端口不足
case EADDRINUSE: // 目标端口占用
case EADDRNOTAVAIL: // 地址不可用
case ECONNREFUSED: // 对端拒绝(RST)
case ENETUNREACH: // 网络不可达
if (retry_) retry(fd_);
break;
// ── 不可恢复的致命错误 ──
case EACCES: // 权限不足(如连接特权端口)
case EPERM:
case EAFNOSUPPORT:// 地址族不支持
case EALREADY: // 已有连接请求未完成
case EBADF: // fd 无效
case EFAULT: // 地址指针无效
case ENOTSOCK: // fd 不是 socket
close(fd_);
errorCallback_(); // 通知上层,不重试
break;
}
}
|
错误分类的意义:
- 可重试错误(ECONNREFUSED 等):对端可能暂时不在线,等一会儿重试有意义
- 致命错误(EBADF 等):程序逻辑问题,重试没有意义,直接报错
3.4 handleWrite() — 连接结果检测#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| void Connector::handleWrite()
{
if (status_ == Status::Connecting) {
int sockfd = removeAndResetChannel(); // 取消写事件监听
int err = Socket::getSocketError(sockfd); // getsockopt(SO_ERROR)
if (err) {
// 连接失败:可重试就重试,否则关闭并报错
if (retry_) retry(sockfd);
else { close(sockfd); }
errorCallback_();
}
else if (Socket::isSelfConnect(sockfd)) {
// 自连接检测(见第 9 课)
if (retry_) retry(sockfd);
errorCallback_();
}
else {
// 连接成功!
status_ = Status::Connected;
if (connect_) {
newConnectionCallback_(sockfd); // 把 fd 交给上层
}
// 如果已经 stop(),close(sockfd)
}
}
}
|
removeAndResetChannel() 的延迟析构:
1
2
3
4
5
6
7
8
9
10
11
| int Connector::removeAndResetChannel()
{
channelPtr_->disableAll();
channelPtr_->remove();
int sockfd = channelPtr_->fd();
// 关键:不能在这里直接 reset(),因为此时正在 Channel::handleEvent() 调用栈中!
// 用 queueInLoop 延迟到下一次循环再析构
loop_->queueInLoop([channelPtr = channelPtr_]() {});
channelPtr_.reset();
return sockfd;
}
|
这是一个精妙的"延迟析构":如果在 Channel 的 handleEvent() 调用链中直接 reset() Channel,会析构正在执行的对象,触发栈上悬垂引用。queueInLoop 捕获一份 shared_ptr,让 Channel 在当前调用链结束后(下一次 loop 迭代)再析构。
3.5 retry() — 指数退避重连#
1
2
3
4
5
6
7
8
9
10
11
12
13
| void Connector::retry(int sockfd)
{
::close(sockfd); // 先关掉失败的 socket
status_ = Status::Disconnected;
if (connect_) {
LOG_INFO << "Retry in " << retryInterval_ << "ms";
loop_->runAfter(retryInterval_ / 1000.0,
std::bind(&Connector::startInLoop, shared_from_this()));
retryInterval_ = retryInterval_ * 2; // 每次翻倍
if (retryInterval_ > maxRetryInterval_)
retryInterval_ = maxRetryInterval_; // 上限 30 秒
}
}
|
指数退避策略:
1
2
3
4
5
6
| 第 1 次重试:500ms 后
第 2 次重试:1000ms 后
第 3 次重试:2000ms 后
第 4 次重试:4000ms 后
...
第 n 次重试:30000ms 后(上限,不再增长)
|
为什么指数退避?
如果服务器宕机,大量客户端同时以固定间隔重连,会形成重连风暴,服务器刚恢复就被打垮。指数退避让重连请求自然分散,对服务器更友好。
shared_from_this() 的必要性:
retry() 用 runAfter 把 startInLoop 延迟执行。在延迟执行之前,TcpClient 可能已经析构了 Connector。用 shared_from_this() 让定时器持有一份引用计数,保证 Connector 存活到定时器触发后。这就是为什么 Connector 继承了 enable_shared_from_this。
3.6 stop() 的线程安全#
1
2
3
4
5
6
7
8
9
10
11
12
| void Connector::stop()
{
status_ = Status::Disconnected;
if (loop_->isInLoopThread()) {
removeAndResetChannel();
} else {
// 跨线程调用:投递到 EventLoop 线程执行
loop_->queueInLoop([thisPtr = shared_from_this()]() {
thisPtr->removeAndResetChannel();
});
}
}
|
stop() 可以从任意线程调用(如用户线程触发断线),而 Channel 操作必须在 EventLoop 线程执行,因此用 queueInLoop 跨线程投递。
四、两个类的对比#
| 特性 | Acceptor | Connector |
|---|
| 角色 | 服务端,被动接受 | 客户端,主动发起 |
| 关键系统调用 | listen() + accept() | connect() |
| epoll 事件 | 读事件(listenFd 可读=有新连接) | 写事件(连接完成触发) |
| 结果判断 | accept4() 返回 >= 0 | getsockopt(SO_ERROR) |
| 错误处理 | EMFILE 用 idleFd_ 优雅拒绝 | errno 分类,可重试错误指数退避 |
| 生命周期 | 在 TcpServer 整个运行期存在 | 每次连接新建,成功后把 fd 交出 |
| 线程安全 | listen/accept 在同一 EventLoop 线程 | stop() 跨线程安全,内部 queueInLoop |
| 自连接检测 | 不需要 | Socket::isSelfConnect() |
五、socket 选项钩子机制#
两个类都提供了 socket 选项钩子,给用户不改框架源码就能定制 socket 行为的能力:
1
2
3
4
5
6
| Acceptor:
beforeListenSetSockOptCallback_(fd) ← listen() 之前调用
afterAcceptSetSockOptCallback_(fd) ← 每次 accept() 成功后调用
Connector:
sockOptCallback_(fd) ← connect() 之前调用
|
典型使用场景:
1
2
3
4
5
6
7
8
9
10
11
12
| // 服务器:对每个新连接设置 TCP_NODELAY
acceptor.setAfterAcceptSockOptCallback([](int fd) {
int on = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
});
// 客户端:绑定特定本地端口(如需要端口白名单穿透防火墙)
connector.setSockOptCallback([](int fd) {
struct sockaddr_in local;
local.sin_port = htons(12345); // 固定本地端口
bind(fd, (sockaddr*)&local, sizeof(local));
});
|
六、游戏服务器实践#
6.1 服务端场景(Acceptor)#
1
2
3
4
5
6
7
8
9
| // 典型的游戏网关服配置
TcpServer gatewayServer(&loop, InetAddress(9000), "Gateway");
// 内部:TcpServer 持有 Acceptor,在 start() 时调用 acceptor.listen()
// 对每个新连接:关 Nagle(低延迟),设置接收缓冲区
gatewayServer.setBeforeListenSockOptCallback([](int fd) {
int size = 4 * 1024 * 1024; // 4MB 接收缓冲区
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
});
|
6.2 客户端场景(Connector)#
1
2
3
4
5
6
7
8
9
| // 游戏服连接数据库服(内网连接,需要断线重连)
TcpClient dbClient(&loop, InetAddress("10.0.0.100", 3306), "DBClient");
dbClient.enableRetry(); // 启用指数退避重连
// 内部:TcpClient 持有 Connector,断线后 Connector::retry() 自动重连
// 重连时的日志输出:
// INFO Retry connecting to 10.0.0.100:3306 in 500 milliseconds.
// INFO Retry connecting to 10.0.0.100:3306 in 1000 milliseconds.
// INFO Retry connecting to 10.0.0.100:3306 in 2000 milliseconds.
|
核心收获#
idleFd_ EMFILE 技巧:预占 /dev/null fd,fd 耗尽时临时借用→接受→立即关闭→重新打开,让客户端收到干净 RST 而非无限等待- 非阻塞
connect() 返回 EINPROGRESS 是正常的:注册写事件,触发后用 getsockopt(SO_ERROR) 检查真实结果 - errno 要分类:
EINTR/EAGAIN/EADDRINUSE 等可重试;ECONNREFUSED/ENETUNREACH 等是致命错误 - 指数退避:500ms→1s→2s→…→30s 封顶,
shared_from_this() 在 timer 期间保活 Connector removeAndResetChannel() 用 queueInLoop 延迟销毁 Channel,防止在 Channel 自身的回调中析构
七、思考题#
Acceptor 的 idleFd_ 在 EMFILE 时先 close 再 accept 再 close。这个操作窗口内(第一次 close 到最后一次 open 之间),如果又有新连接到来,readCallback 还会被再次调用吗?会不会出现问题?
Connector::handleWrite() 用 getsockopt(SO_ERROR) 判断 connect 结果,而不是直接用 connect() 的返回值。为什么 connect() 返回 EINPROGRESS 就判断成功是错误的?有什么场景下连接会在写事件触发时才真正失败?
Connector 继承了 enable_shared_from_this,而 Acceptor 没有。为什么 Connector 需要、Acceptor 不需要?(提示:分析两者的 retry 行为和 TcpServer/TcpClient 对它们的持有方式)
指数退避的上限是 30 秒,但 retryInterval_ 在重连成功后不会重置为初始值(500ms)。如果连接建立 → 短暂断线 → 重连时使用的是 30 秒间隔,这合理吗?如何改进?
八、思考题参考答案#
1. idleFd_ 操作窗口内新连接到来的行为分析#
源码位置:Acceptor.cc 第 92-98 行
1
2
3
4
5
6
7
| if (errno == EMFILE)
{
::close(idleFd_); // ① 释放备用 fd
idleFd_ = sock_.accept(&peer); // ② accept 取出一个连接
::close(idleFd_); // ③ 立刻关闭(拒绝)
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); // ④ 重新占住
}
|
窗口期分析:
整个操作发生在单个 EventLoop 线程中,而 Acceptor 的 readCallback() 是由 Channel 的事件处理函数调用的。在 ① 到 ④ 的执行期间,不会被另一次 readCallback() 中断(EventLoop 是单线程事件驱动模型,不存在同一 Channel 的 readCallback 重入)。
但问题是:在 ① close(idleFd_) 之后、④ open("/dev/null", ...) 之前,进程有了 1-2 个空闲 fd 槽位。如果操作系统恰好在这个极短窗口内完成了一个新的 TCP 三次握手(这在内核层面是异步的),新连接会被放入 listen socket 的已完成连接队列(accept queue)中。
回答:
readCallback 不会在窗口期内被再次调用:因为 EventLoop 是单线程的,当前正在执行 readCallback,epoll_wait 不会在此时被调用,所以不会触发新的 readCallback
新连接会在 accept queue 中等待:窗口期内到达的新连接在内核的 accept queue 中排队,不受影响
当 readCallback 返回后,EventLoop 重新 epoll_wait:
- 如果 accept queue 中还有未取走的连接,listenFd 仍然会被报告为可读
- 下一次
readCallback 调用 sock_.accept() 时,如果 fd 配额仍然不够(EMFILE),又会重复 idleFd_ 技巧 - 如果中间有其他连接关闭释放了 fd,accept 就能成功
潜在问题:这个方案每次只能拒绝一个等待中的连接。如果 accept queue 中积压了大量连接(比如突发的连接风暴),每次 readCallback 只拒绝一个,其他连接继续在 queue 中等待。但由于 epoll 是 LT(水平触发)模式,只要 queue 不为空就会持续触发 readCallback,所以最终所有积压连接都会被逐一拒绝,不会永久堆积
还有一个微妙的竞态:在 ② accept 和 ③ close 之间,进程的 fd 数量短暂回到了上限。如果此时有其他线程尝试 open/socket/accept,也会得到 EMFILE。但 Acceptor 是在 EventLoop 线程中运行的,通常 fd 操作也在同一线程,所以这个竞态的实际影响很小
2. 为什么用 getsockopt(SO_ERROR) 而不是 connect() 返回值#
源码位置:Connector.cc 第 172-245 行
1
2
3
4
5
6
7
| void Connector::handleWrite()
{
// ...
int err = Socket::getSocketError(sockfd); // getsockopt(SO_ERROR)
if (err) { /* 失败 */ }
else { /* 成功 */ }
}
|
根本原因:非阻塞 connect() 的返回值和最终连接结果是两码事。
时间线拆解:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| t=0 connect() 调用
→ 返回 -1, errno=EINPROGRESS
含义:TCP SYN 已发出,三次握手正在进行中
此时你只知道"请求已发出",不知道结果
t=0~N [等待三次握手完成]
可能发生:
a) SYN-ACK 收到 → 回 ACK → 连接建立
b) RST 收到 → 连接被拒绝
c) ICMP 不可达收到 → 网络不可达
d) 超时 → 无响应
t=N epoll 报告 EPOLLOUT(写事件就绪)
→ handleWrite() 被调用
|
为什么不能用 connect() 的返回值?
connect() 返回 EINPROGRESS 只表示"SYN 已发送",不是成功也不是失败connect() 在返回后就已经"完成"了(从系统调用的角度),后续的 TCP 握手结果不会再通过 connect() 返回- 在某些系统上对同一个 socket 再次调用
connect() 可能返回 EISCONN(已连接)或 EALREADY(操作已在进行中),但行为不可移植
为什么 EPOLLOUT 不能直接当成功?
写事件就绪只表示"connect 过程结束了",不区分成功还是失败:
- 连接成功:socket 变为可写(有发送缓冲区可用)
- 连接失败:socket 也会变为可写(kernel 标记为可写 + error),同时 EPOLLERR 也会被设置
必须通过 getsockopt(SOL_SOCKET, SO_ERROR, ...) 读取 socket 的待处理错误码来判断真实结果:
err == 0:连接成功err == ECONNREFUSED:对端拒绝(RST)err == ETIMEDOUT:连接超时err == ENETUNREACH:网络不可达
实际失败场景举例:
场景 1 - 对端端口未监听:
1
2
3
4
5
| client → SYN → server:9999(无人监听)
server → RST → client
client 的 socket SO_ERROR = ECONNREFUSED
epoll 报告 EPOLLOUT | EPOLLERR
handleWrite() 中 getsockopt 读到 ECONNREFUSED → 重试
|
场景 2 - 网络不可达:
1
2
3
4
| client → SYN → 路由器 → 目标网络不存在
路由器 → ICMP Destination Unreachable → client
client 的 socket SO_ERROR = ENETUNREACH
epoll 报告 EPOLLOUT | EPOLLERR
|
场景 3 - 防火墙静默丢包:
1
2
3
4
| client → SYN → 防火墙(丢弃,不回复)
...等待很久...
client 内核 TCP 重传 SYN 多次后放弃
SO_ERROR = ETIMEDOUT
|
3. Connector 需要 enable_shared_from_this 而 Acceptor 不需要的原因#
源码位置:
Connector.h 第 25-26 行:class Connector : public std::enable_shared_from_this<Connector>Acceptor.h 第 28 行:class Acceptor : NonCopyable(不继承 enable_shared_from_this)
Connector 需要的原因——延迟回调的生命周期保障:
Connector 在两个关键位置使用了 shared_from_this():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // Connector.cc 第 289-290 行 — retry() 中
loop_->runAfter(retryInterval_ / 1000.0,
std::bind(&Connector::startInLoop, shared_from_this()));
// Connector.cc 第 59 行 — stop() 中
loop_->queueInLoop([thisPtr = shared_from_this()]() {
thisPtr->removeAndResetChannel();
});
// Connector.cc 第 147-151 行 — connecting() 中
channelPtr_->setWriteCallback(
std::bind(&Connector::handleWrite, shared_from_this()));
channelPtr_->setErrorCallback(
std::bind(&Connector::handleError, shared_from_this()));
|
核心问题是:Connector 会注册延迟执行的回调(定时器回调、Channel 回调),这些回调在未来某个时刻执行时,Connector 对象必须仍然存活。
具体场景:
TcpClient 持有 shared_ptr<Connector>- Connector 调用
retry(),注册一个 500ms 后的定时器回调 - 在这 500ms 内,用户调用
TcpClient::disconnect(),TcpClient 析构,释放了 shared_ptr<Connector> - 如果
retry() 中用的是裸 this,Connector 已被销毁,500ms 后定时器触发时访问的是野指针 → 段错误 - 用
shared_from_this() 让定时器的 lambda 也持有一份 shared_ptr,引用计数不归零,Connector 安全存活到回调执行完毕
Acceptor 不需要的原因——生命周期与 TcpServer 完全绑定:
- Acceptor 没有 retry 机制:Acceptor 是服务端监听,一旦创建就持续运行到 TcpServer 析构。它不需要延迟重连,也没有定时器回调
- Acceptor 由 TcpServer 直接持有(作为成员变量或
unique_ptr),生命周期明确:TcpServer 构造时创建,析构时销毁 - Acceptor 的 Channel 回调(readCallback)是
std::bind(&Acceptor::readCallback, this),直接用裸 this。这是安全的,因为 Acceptor 析构时会调用 acceptChannel_.disableAll() 和 acceptChannel_.remove()(见 Acceptor.cc 第 47-49 行),确保析构后不会有任何回调触发 - Acceptor 没有跨生命周期的异步操作:它的所有操作都是同步的(在 EventLoop 线程中立即执行),不存在"回调到来时对象已销毁"的风险
对比总结:
| 特性 | Acceptor | Connector |
|---|
| 持有方式 | TcpServer 直接持有(唯一所有者) | shared_ptr<Connector>(可能被多处引用) |
| 是否有定时器回调 | 无 | 有(retry 指数退避) |
| Channel 回调中用的 this | 裸 this(安全,因为析构时取消了 Channel) | shared_from_this()(因为 Channel 可能比 TcpClient 活得久) |
| 析构保护 | disableAll() + remove() 在析构函数中 | shared_ptr 引用计数保护 |
4. retryInterval_ 不重置的合理性分析与改进#
源码位置:Connector.h 第 86-87 行 和 Connector.cc 第 275-299 行
1
2
3
4
5
6
7
8
| // Connector.h
int retryInterval_{kInitRetryDelayMs}; // 初始 500ms
int maxRetryInterval_{kMaxRetryDelayMs}; // 上限 30000ms
// Connector.cc — retry()
retryInterval_ = retryInterval_ * 2;
if (retryInterval_ > maxRetryInterval_)
retryInterval_ = maxRetryInterval_;
|
查看完整的 Connector 代码可以确认:没有任何地方在连接成功后将 retryInterval_ 重置为 kInitRetryDelayMs(500ms)。handleWrite() 中连接成功时只是设置 status_ = Connected 并调用 newConnectionCallback_,没有重置 retryInterval_。
这不太合理,原因如下:
问题场景:
1
2
3
4
5
6
7
8
9
10
11
| t=0 首次连接失败,retryInterval_=500ms
t=0.5 重试,失败,retryInterval_=1000ms
t=1.5 重试,失败,retryInterval_=2000ms
...经过多次重试...
t=60 重试,成功!retryInterval_ 已经增长到 30000ms(上限)
连接运行正常...
t=120 服务器短暂重启(2秒就恢复)
t=120 连接断开,TcpClient 调用 Connector::start() 发起重连
t=120 connect() 失败(服务器还没起来),retry(fd)
→ 使用 retryInterval_=30000ms → 等 30 秒后才重试!
但服务器 2 秒后就恢复了,白白等了 28 秒
|
这对游戏服务器来说是不可接受的——数据库连接断了 2 秒后就恢复,但游戏服需要 30 秒才能重连上。
为什么 trantor 没有重置?可能的考量:
- 简单性:当前 Connector 的实现比较简洁,每次
retry() 只管递增间隔,没有"成功后重置"的逻辑。TcpClient 在连接断开后会创建新的 Connector 对象,新对象的 retryInterval_ 是初始值 500ms。所以这个问题可能在实际使用中不太明显——取决于 TcpClient 是复用 Connector 还是新建 - 防抖动:如果连接频繁断开/重连(flapping),不重置间隔可以起到"惩罚"效果,避免频繁重连给服务器造成压力
改进方案:
方案 A — 连接成功后重置(最直观):
1
2
3
4
5
6
| // 在 handleWrite() 连接成功的分支中添加
status_ = Status::Connected;
retryInterval_ = kInitRetryDelayMs; // 重置为 500ms
if (connect_) {
newConnectionCallback_(sockfd);
}
|
方案 B — 部分重置(带保护):
1
2
| // 连接成功后不完全重置为 500ms,而是减半(但不低于初始值)
retryInterval_ = std::max(kInitRetryDelayMs, retryInterval_ / 2);
|
这样如果连接不稳定(频繁断重连),间隔不会每次都从 500ms 开始爬升,有一定的"记忆"效果。
方案 C — 由 TcpClient 层面处理:
每次断线后创建全新的 Connector 对象(retryInterval_ 自然是初始值),旧 Connector 对象在没有活跃定时器后自然析构。这是 muduo/trantor 的一些使用方式中实际采用的方案,但代价是每次断线重连都要重新创建对象。
推荐方案 A,在 handleWrite() 的成功分支中简单添加一行 retryInterval_ = kInitRetryDelayMs 即可,语义清晰,对游戏服务器的快速重连场景最友好。
学习日期:2025-03-21 | 上一课:第09课_网络地址与Socket封装 | 下一课:第11课_TcpConnection连接生命周期