1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
| // chat_server.cpp
// 编译: g++ -std=c++20 -fcoroutines chat_server.cpp \
// -lboost_system -lpthread -o chat
// 测试: 多个终端分别 nc localhost 8888,输入的消息会广播给所有人
#include <boost/asio.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/streambuf.hpp>
#include <iostream>
#include <set>
#include <memory>
#include <deque>
#include <string>
using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::use_awaitable;
// ========================================
// 前置声明
// ========================================
class ChatSession;
// ========================================
// 聊天室:管理所有在线用户
// ========================================
class ChatRoom
{
public:
// 新用户加入
void join(std::shared_ptr<ChatSession> session)
{
sessions_.insert(session);
std::cout << "[系统] 用户加入,当前在线: "
<< sessions_.size() << "\n";
}
// 用户离开
void leave(std::shared_ptr<ChatSession> session)
{
sessions_.erase(session);
std::cout << "[系统] 用户离开,当前在线: "
<< sessions_.size() << "\n";
}
// 广播消息给所有用户(除了发送者)
void broadcast(const std::string& msg,
std::shared_ptr<ChatSession> sender);
private:
std::set<std::shared_ptr<ChatSession>> sessions_;
};
// ========================================
// 聊天会话:处理单个用户的收发
// ========================================
class ChatSession : public std::enable_shared_from_this<ChatSession>
{
public:
ChatSession(tcp::socket socket, ChatRoom& room)
: socket_(std::move(socket))
, room_(room)
, writeTimer_(socket_.get_executor())
{
}
void start()
{
room_.join(shared_from_this());
// 同时启动读协程和写协程
// 它们并发运行:读协程接收用户输入,写协程发送广播消息
boost::asio::co_spawn(
socket_.get_executor(),
[self = shared_from_this()]() { return self->readerLoop(); },
boost::asio::detached);
boost::asio::co_spawn(
socket_.get_executor(),
[self = shared_from_this()]() { return self->writerLoop(); },
boost::asio::detached);
}
// 将消息放入发送队列
void deliver(const std::string& msg)
{
writeQueue_.push_back(msg);
// 唤醒 writer 协程:通过取消 timer 的等待
writeTimer_.cancel_one();
}
private:
// 读协程:持续读取用户输入,按行分割
awaitable<void> readerLoop()
{
try
{
boost::asio::streambuf buf;
for (;;)
{
// async_read_until:读取直到遇到 '\n'
// 返回值 n 包含了分隔符的长度
auto n = co_await boost::asio::async_read_until(
socket_, buf, '\n', use_awaitable);
// 从 streambuf 提取一行内容
std::string line(
boost::asio::buffers_begin(buf.data()),
boost::asio::buffers_begin(buf.data()) + n);
buf.consume(n); // 消耗已读取的数据
// 广播给聊天室中的其他人
room_.broadcast(line, shared_from_this());
}
}
catch (std::exception&)
{
// 连接断开或出错 → 退出聊天室
stop();
}
}
// 写协程:等待并发送队列中的消息
awaitable<void> writerLoop()
{
try
{
// 设置一个"永不到期"的定时器作为通知机制
// 当有新消息时,deliver() 会 cancel_one() 来唤醒这个等待
while (socket_.is_open())
{
if (writeQueue_.empty())
{
// 队列为空,等待通知
writeTimer_.expires_at(
boost::asio::steady_timer::time_point::max());
// redirect_error:将异常转为 error_code
// 这样 cancel 不会抛异常,而是返回 operation_aborted
boost::system::error_code ec;
co_await writeTimer_.async_wait(
boost::asio::redirect_error(use_awaitable, ec));
// ec == operation_aborted 表示被 deliver() 唤醒
}
// 发送队列中的所有消息
while (!writeQueue_.empty())
{
co_await boost::asio::async_write(
socket_,
boost::asio::buffer(writeQueue_.front()),
use_awaitable);
writeQueue_.pop_front();
}
}
}
catch (std::exception&)
{
stop();
}
}
void stop()
{
room_.leave(shared_from_this());
boost::system::error_code ec;
socket_.close(ec);
writeTimer_.cancel();
}
tcp::socket socket_;
ChatRoom& room_;
boost::asio::steady_timer writeTimer_;
std::deque<std::string> writeQueue_;
};
// ChatRoom::broadcast 实现
void ChatRoom::broadcast(const std::string& msg,
std::shared_ptr<ChatSession> sender)
{
for (auto& session : sessions_)
{
if (session != sender) // 不发给发送者自己
{
session->deliver(msg);
}
}
}
// ========================================
// 监听协程:接受新连接
// ========================================
awaitable<void> listener(tcp::acceptor acceptor, ChatRoom& room)
{
for (;;)
{
// 等待新连接
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
std::cout << "[系统] 新连接: " << socket.remote_endpoint() << "\n";
// 创建会话并启动
std::make_shared<ChatSession>(std::move(socket), room)->start();
}
}
// ========================================
// 主函数
// ========================================
int main()
{
try
{
boost::asio::io_context ioCtx;
ChatRoom room;
tcp::acceptor acceptor(ioCtx, tcp::endpoint(tcp::v4(), 8888));
std::cout << "=== 聊天室服务器启动 ===\n";
std::cout << "监听端口: 8888\n";
std::cout << "使用 nc localhost 8888 或 telnet localhost 8888 连接\n";
std::cout << "========================\n\n";
boost::asio::co_spawn(ioCtx,
listener(std::move(acceptor), room),
boost::asio::detached);
// 优雅关闭:Ctrl+C 停止服务器
boost::asio::signal_set signals(ioCtx, SIGINT, SIGTERM);
signals.async_wait([&ioCtx](auto, auto) {
std::cout << "\n[系统] 收到停止信号,关闭服务器\n";
ioCtx.stop();
});
ioCtx.run();
}
catch (const std::exception& e)
{
std::cerr << "致命错误: " << e.what() << "\n";
}
return 0;
}
|