06. 事件循环的实现
# 06. 事件循环的实现
这一章带大家瞧瞧一个回显服务器(echo server)的真实C++代码。先看看Conn
结构体的定义:
enum {
STATE_REQ = 0,
STATE_RES = 1,
STATE_END = 2, // 标记这个连接,准备删除它
};
struct Conn {
int fd = -1;
uint32_t state = 0; // 取值为STATE_REQ 或 STATE_RES
// 读缓冲区
size_t rbuf_size = 0;
uint8_t rbuf[4 + k_max_msg];
// 写缓冲区
size_t wbuf_size = 0;
size_t wbuf_sent = 0;
uint8_t wbuf[4 + k_max_msg];
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
我们需要读写缓冲区,毕竟在非阻塞模式下,输入输出(IO)操作经常得往后推一推。
这个state
(状态)是用来决定怎么处理连接的。一个进行中的连接有两种状态,STATE_REQ
是用来读取请求的,STATE_RES
则是用来发送响应的。
下面是事件循环的代码:
int main() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
die("socket()");
}
// 绑定、监听等等操作
// 代码省略...
// 一个存放所有客户端连接的映射,用文件描述符(fd)作为键
std::vector<Conn *> fd2conn;
// 将监听的文件描述符设置为非阻塞模式
fd_set_nb(fd);
// 事件循环
std::vector<struct pollfd> poll_args;
while (true) {
// 准备poll()的参数
poll_args.clear();
// 为了方便,把监听的文件描述符放在第一个位置
struct pollfd pfd = {fd, POLLIN, 0};
poll_args.push_back(pfd);
// 连接的文件描述符
for (Conn *conn : fd2conn) {
if (!conn) {
continue;
}
struct pollfd pfd = {};
pfd.fd = conn->fd;
pfd.events = (conn->state == STATE_REQ)? POLLIN : POLLOUT;
pfd.events = pfd.events | POLLERR;
poll_args.push_back(pfd);
}
// 轮询(poll)活跃的文件描述符
// 这里的超时参数没啥用,随便设个大点儿的数就行
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), 1000);
if ( rv < 0) {
die("poll");
}
// 处理活跃的连接
for (size_t i = 1; i < poll_args.size(); ++i) {
if (poll_args[i].revents) {
Conn *conn = fd2conn[poll_args[i].fd];
connection_io(conn);
if (conn->state == STATE_END) {
// 客户端正常关闭,或者出了啥问题
// 销毁这个连接
fd2conn[conn->fd] = NULL;
(void)close(conn->fd);
free(conn);
}
}
}
// 如果监听的文件描述符活跃,尝试接受一个新连接
if (poll_args[0].revents) {
(void)accept_new_conn(fd2conn, fd);
}
}
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
事件循环里,咱们首先要做的就是设置poll
的参数。监听的文件描述符用POLLIN
标志来轮询。对于连接的文件描述符,Conn
结构体的状态决定了轮询标志。在这个例子里,轮询标志要么是读(POLLIN
),要么是写(POLLOUT
),不会同时出现。要是用epoll
的话,事件循环里通常第一件事就是用epoll_ctl
更新文件描述符集合。
poll
还有个超时参数,这个参数可以用来实现定时器。在咱们这儿,这参数不重要,随便设个大数就行。poll
返回之后,我们就能知道哪些文件描述符可以读写啦,然后就可以采取相应行动。
accept_new_conn()
函数负责接受新连接,并且创建Conn
结构体对象:
static void conn_put(std::vector<Conn *> &fd2conn, struct Conn *conn) {
if (fd2conn.size() <= (size_t)conn->fd) {
fd2conn.resize(conn->fd + 1);
}
fd2conn[conn->fd] = conn;
}
static int32_t accept_new_conn(std::vector<Conn *> &fd2conn, int fd) {
// 接受连接
struct sockaddr_in client_addr = {};
socklen_t socklen = sizeof(client_addr);
int connfd = accept(fd, (struct sockaddr *)&client_addr, &socklen);
if (connfd < 0) {
msg("accept() error");
return -1; // 出错了
}
// 将新连接的文件描述符设置为非阻塞模式
fd_set_nb(connfd);
// 创建Conn结构体
struct Conn *conn = (struct Conn *)malloc(sizeof(struct Conn));
if (!conn) {
close(connfd);
return -1;
}
conn->fd = connfd;
conn->state = STATE_REQ;
conn->rbuf_size = 0;
conn->wbuf_size = 0;
conn->wbuf_sent = 0;
conn_put(fd2conn, conn);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
connection_io()
函数是处理客户端连接的状态机:
static void connection_io(Conn *conn) {
if (conn->state == STATE_REQ) {
state_req(conn);
} else if (conn->state == STATE_RES) {
state_res(conn);
} else {
assert(0); // 不该出现这种情况
}
}
2
3
4
5
6
7
8
9
STATE_REQ
状态是用来读数据的:
static void state_req(Conn *conn) {
while (try_fill_buffer(conn)) {}
}
static bool try_fill_buffer(Conn *conn) {
// 尝试填充缓冲区
assert(conn->rbuf_size < sizeof(conn->rbuf));
ssize_t rv = 0;
do {
size_t cap = sizeof(conn->rbuf) - conn->rbuf_size;
rv = read(conn->fd, &conn->rbuf[conn->rbuf_size], cap);
} while ( rv < 0 && errno == EINTR);
if ( rv < 0 && errno == EAGAIN) {
// 遇到EAGAIN,停止读取
return false;
}
if ( rv < 0) {
msg("read() error");
conn->state = STATE_END;
return false;
}
if ( rv == 0) {
if (conn->rbuf_size > 0) {
msg("unexpected EOF");
} else {
msg("EOF");
}
conn->state = STATE_END;
return false;
}
conn->rbuf_size += (size_t)rv;
assert(conn->rbuf_size <= sizeof(conn->rbuf) - conn->rbuf_size);
// 尝试逐个处理请求
// 为啥这里有个循环呢?去看看“流水线(pipelining)”的解释就懂啦
while (try_one_request(conn)) {}
return (conn->state == STATE_REQ);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
这里面门道可不少。要理解这个函数,咱们回顾一下上一章的伪代码:
def do_something_to_client(fd):
if should_read_from(fd):
data = read_until_EAGAIN(fd)
process_incoming_data(data)
# 代码省略...
2
3
4
5
try_fill_buffer()
函数用数据填充读缓冲区。因为读缓冲区大小有限,可能还没遇到EAGAIN
,缓冲区就满了,所以读完之后得马上处理数据,腾出点空间,然后继续循环调用try_fill_buffer()
,直到遇到EAGAIN
。
读系统调用(以及其他系统调用)如果遇到errno
是EINTR
就得重试。EINTR
表示系统调用被信号中断了,就算咱们的程序没用信号,也得重试。
try_one_request
函数负责处理收到的数据,可为啥这里要用个循环呢?读缓冲区里难道会有不止一个请求?没错!对于请求/响应协议来说,客户端可不局限于一次发一个请求,然后等着响应。客户端可以连着发好几个请求,中间不用等响应,这样能节省点延迟,这种操作模式就叫“流水线(pipelining)”。所以可别以为读缓冲区里最多就一个请求。
下面看看try_one_request
函数的代码:
static bool try_one_request(Conn *conn) {
// 尝试从缓冲区解析出一个请求
if (conn->rbuf_size < 4) {
// 缓冲区数据不够,下次循环再试试
return false;
}
uint32_t len = 0;
memcpy(&len, &conn->rbuf[0], 4);
if (len > k_max_msg) {
msg("too long");
conn->state = STATE_END;
return false;
}
if (4 + len > conn->rbuf_size) {
// 缓冲区数据不够,下次循环再试试
return false;
}
// 拿到一个请求,处理一下
printf("client says: %. *s\n", len, &conn->rbuf[4]);
// 生成回显响应
memcpy(&conn->wbuf[0], &len, 4);
memcpy(&conn->wbuf[4], &conn->rbuf[4], len);
conn->wbuf_size = 4 + len;
// 从缓冲区移除这个请求
// 注意:频繁调用memmove效率可不高
// 注意:生产环境的代码得优化下这部分
size_t remain = conn->rbuf_size - 4 - len;
if (remain) {
memmove(conn->rbuf, &conn->rbuf[4 + len], remain);
}
conn->rbuf_size = remain;
// 切换状态
conn->state = STATE_RES;
state_res(conn);
// 如果请求处理完了,就继续外层循环
return (conn->state == STATE_REQ);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
try_one_request
函数从读缓冲区取出一个请求,生成响应,然后切换到STATE_RES
状态。
下面是STATE_RES
状态的代码:
static void state_res(Conn *conn) {
while (try_flush_buffer(conn)) {}
}
static bool try_flush_buffer(Conn *conn) {
ssize_t rv = 0;
do {
size_t remain = conn->wbuf_size - conn->wbuf_sent;
rv = write(conn->fd, &conn->wbuf[conn->wbuf_sent], remain);
if ( rv < 0 && errno == EAGAIN) {
// 遇到EAGAIN,停止写入
return false;
}
if ( rv < 0) {
msg("write() error");
conn->state = STATE_END;
return false;
}
conn->wbuf_sent += (size_t)rv;
assert(conn->wbuf_sent <= conn->wbuf_size);
if (conn->wbuf_sent == conn->wbuf_size) {
// 响应全部发送完毕,切换回STATE_REQ状态
conn->state = STATE_REQ;
conn->wbuf_sent = 0;
conn->wbuf_size = 0;
return false;
}
// 写缓冲区还有数据,可以再试试写入
return true;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
上面这段代码会不断刷新写缓冲区,直到遇到EAGAIN
,要是缓冲区数据都写完了,就切换回STATE_REQ
状态。
要测试咱们的服务器,可以运行第4章的客户端,因为协议是一样的。也可以改改客户端代码,来演示下流水线客户端:
// 之前的query函数被拆分成了send_req和read_res
static int32_t send_req(int fd, const char *text);
static int32_t read_res(int fd);
int main() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
die("socket()");
}
// 代码省略...
// 多个流水线请求
const char *query_list[3] = {"hello1", "hello2", "hello3"};
for (size_t i = 0; i < 3; ++i) {
int32_t err = send_req(fd, query_list[i]);
if (err) {
goto L_DONE;
}
}
for (size_t i = 0; i < 3; ++i) {
int32_t err = read_res(fd);
if (err) {
goto L_DONE;
}
}
L_DONE:
close(fd);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 练习:
- 试试在事件循环里用
epoll
替换poll
,这应该挺简单的。 - 我们用
memmove
来回收读缓冲区空间,不过每次处理请求都调用memmove
没必要,改改代码,让它只在读取之前调用memmove
。 - 在
state_res
函数里,一次只写一个响应。在流水线场景下,可以把多个响应先缓存起来,最后用一次write
调用把它们都发出去。注意写缓冲区可能中途就满了。
06_client.cpp
06_server.cpp