1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
| // coroutine_echo.cpp — 协程式 io_uring Echo Server
// 编译:g++ -std=c++20 -O2 coroutine_echo.cpp -luring -o coroutine_echo
#include "IoUringContext.hpp"
#include "Task.hpp"
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <cstdio>
#include <vector>
#include <memory>
// 处理单个客户端连接的协程
// 注意:协程的生命周期由 IoUringContext 管理
Task<void> handleClient(IoUringContext& ctx, int clientFd)
{
char buf[1024];
printf("[+] 协程启动: fd=%d\n", clientFd);
while (true) {
// 异步接收 —— 协程在此挂起,直到数据到达
int n = co_await ctx.asyncRecv(clientFd, buf, sizeof(buf), 0);
if (n <= 0) {
if (n < 0) {
printf("[!] recv 错误 fd=%d: res=%d\n", clientFd, n);
}
break;
}
printf("[>] fd=%d 收到 %d 字节\n", clientFd, n);
// 异步发送 —— 协程在此挂起,直到发送完成
int sent = co_await ctx.asyncSend(clientFd, buf, n, 0);
if (sent < 0) {
printf("[!] send 错误 fd=%d: res=%d\n", clientFd, sent);
break;
}
printf("[<] fd=%d 回显 %d 字节\n", clientFd, sent);
}
printf("[-] 连接关闭: fd=%d\n", clientFd);
close(clientFd);
co_return;
}
// 接受新连接的协程
Task<void> acceptLoop(IoUringContext& ctx, int listenFd)
{
// 保存所有客户端协程,管理其生命周期
std::vector<std::unique_ptr<Task<void>>> clients;
while (true) {
// 异步 accept —— 协程在此挂起,直到有新连接
int clientFd = co_await ctx.asyncAccept(listenFd, nullptr, nullptr, 0);
if (clientFd < 0) {
printf("[!] accept 错误: res=%d\n", clientFd);
continue;
}
printf("[+] 新连接 fd=%d\n", clientFd);
// 为每个客户端启动一个协程
auto task = std::make_unique<Task<void>>(handleClient(ctx, clientFd));
task->start(); // 启动协程(运行到第一个 co_await)
clients.push_back(std::move(task));
// 清理已完成的协程
clients.erase(
std::remove_if(clients.begin(), clients.end(),
[](const auto& t) { return t->handle().done(); }),
clients.end());
}
}
int main()
{
// 创建监听 socket
int listenFd = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(9000);
addr.sin_addr.s_addr = INADDR_ANY;
bind(listenFd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
listen(listenFd, 128);
printf("协程 Echo Server 监听端口 9000...\n");
// 创建 io_uring 上下文
IoUringContext ctx(256);
// 启动 accept 协程
auto acceptTask = acceptLoop(ctx, listenFd);
acceptTask.start();
// 运行事件循环(阻塞)
ctx.run();
close(listenFd);
return 0;
}
|