学习笔记|Socket 与并发服务器

1.网络应用程序设计模式

1.1 C/S 模式

传统的网络应用设计模式,客户机(client)/ 服务器(server)模式。需要在通讯两端各自部署客户机和服务器来完成数据通信。

优点:

  • 协议选用更加灵活,可以自定义。
  • 可以提前缓存大量数据,提高数据传输的效率。

缺点:

  • 对用户主机构成威胁。
  • 开发团队工作量大,客户端服务端联合调试困难。

1.2 B/S 模式

浏览器(browser)/ 服务器(server)模式。只需在一端部署服务器,而另外一端使用每台 PC 都默认配置的浏览器即可完成数据的传输。

优点:

  • 不会安装软件,相对来说更安全。
  • 不需要开发客户端,工作量相对较小。
  • 跨平台。

缺点:

  • 协议选择不灵活,必须完整的支持协议。
  • 不能数据缓存。

2.socket 概念

socket 不是网络协议,只是传输层协议 TCP/UDP 的实现。

2.1 socket 是伪文件

在 Linux 环境下,用于表示进程间网络通信的特殊文件类型。本质为内核借助缓冲区形成的伪文件。(类比有名管道)

  • 为什么要将套接字封装成伪文件?

    为了统一接口,使读写文件和读写套接字操作一致。

  • 套接字和管道的区别

    管道应用于 本地进程间 通信;套接字应用于 网络进程间 通信。

socket 在数据传输中的位置:

2.2 socket 通信原理

  • IP:在网络环境中唯一标识一台主机。
  • port:在主机中唯一标识一个进程。
  • IP + port:在网络环境中唯一标识一个进程。

所以 IP + port 就对应着一个 socket,想要连接的两个进程通过各自的 socket 来标识,这两个 socket 组成的 socket pair 就唯一标识一个连接。

2.3 socket 的内核实现

  • 文件描述符对应着两个内核缓冲区,一个缓冲区接收数据,一个缓冲区发送数据。

  • 为什么使用两个缓冲区?

    参考管道,只能一个进程读,一个进程写,否则写的数据和读的数据互相覆盖。【半双工

    socket:既能读又能写。【全双工

3.预备知识

3.1 网络字节序

数据包在传输的过程中,需要拆包封包,读取 IP + port,本地字节序是小端方式,而网络字节需是大端方式。

**重要:**因为 TCP/IP 协议规定,网络数据流应采用大端字节序;而 x86 机器都是小端字节序。

所以需要使用字节序转换函数。

h 表示 host,n 表示 network,l 表示 long 32 位长整型,s 表示 short 16位端整型。

1
2
3
4
5
6
#include <arpa/inet.h>

unit32_t htonl(unit32_t hostlong); // IP 本地字节序 -> 网络字节序
unit16_t htons(unit16_t hostshort); // port 本地字节序 -> 网络字节序
unit32_t ntohl(unit32_t netlong); // IP 网络字节序 -> 本地字节序
unit16_t ntohs(unit16_t netshort); // port 本地字节序 -> 网络字节序

PS:如果主机是小端字节序,这些函数将参数做相应的大小端转换然后返回,如果主机是大端字节序,这些函数不做转换,将参数原封不动地返回

3.2 IP 地址转换

常见的点分十进制 IP 地址是字符串类型,我们需要将 string 类型,先手动转化为 unsigned int 类型,才能使用 htonl 等函数。于是有了更便捷的函数,直接传入点分十进制 IP 地址的字符串作为参数。

htonl:192.168.1.2 ==》 unsigned int ==》 网络字节序

inet_pton:192.168.1.2 ==》 网络字节序

1
2
#include <arpa/inet.h>
int inet_pton(int af, const char *src, void *dst); // 本地字节序 ==》网络字节序
  • int af:用来表示 ipv4 还是 ipv6。

    • ipv4:af = AF_INET
    • ipv6:af = AF_INT6
  • char *src:本地字节序。点分十进制的 IP 地址。

  • void *dst:网络字节序,传出参数。比如 server_addr.sin_addr

1
const char *inet_ntop(int af, const void *src, char *dst, socklen_t size);  // 网络字节序 ==》本地字节序
  • int af:同上。

  • void *dst:网络字节序。比如 client_addr.sin_addr

  • char *src:本地字节序。传出参数。

  • socklen_t size:src 字符串的大小。

3.3 sockaddr 数据结构

关于 struct sockaddr:

  • struct sockaddr 结构体诞生早于IPv4协议。

  • struct sockaddr 现在已经废弃,定义结构体时,应使用 sockaddr_in 或 sockaddr_in6。(分别表示 IPV4 和 IPV6)

  • 【重要】但是 bind、accept、connect 等函数的参数,规定的还是 struct sockaddr *

    所以需要强制转换,例如:bind( xxx, (struct sockaddr *) &addr );

关于 sockaddr_in:

netinet/in.h 文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
struct sockaddr_in {
__kernel_sa_family_t sin_family; // Address family 地址结构类型(简称af)
__be16 sin_port; // Port number 端口号
struct in_addr sin_addr; // Internet address IP地址

/* Pad to size of 'struct sockaddr'. */
unsigned char __pad[__SOCK_SIZE__ - sizeof(short int) -
sizeof(unsigned short int) - sizeof(struct in_addr)];
};

struct in_addr { /* Internet address. */
__be32 s_addr;
};
  • sin_family:填 AF_INET,表示 ipv4。

  • port:端口号

  • sin_addr:IP 地址

    可以设置成 htol(INADDR_ANY),这个宏表示本地的任意IP地址,因为服务器可能有多个网卡,每个网卡也可能绑定多个IP地址,这样设置可以在所有的IP地址上监听,直到与某个客户端建立了连接时才确定下来到底用哪个IP地址,

  • 其他参数:填充空间的字符,pad:填充

4.网络套接字函数

4.1 socket 函数

创建一个 socket,返回 socket 文件描述符。

1
2
3
4
#include <sys/types.h>
#include <sys/socket.h>

int socket(int domain, int type, int protocol);
  • domain:

    • AF_INET:IPV4 地址
    • AF_INET6:IPV6 地址
    • AF_UNIX:本地协议
  • type:通信协议

    • SOCK_STREAM:这个协议是按照顺序的、可靠的、数据完整的基于字节流的连接。典型代表协议 – TCP

    • SOCK_DGRAM:这个协议是无连接的、固定长度的传输调用。典型代表协议 – UDP

    • 不常用协议:

      SOCK_SEQPACKET:该协议是双线路的、可靠的连接,发送固定长度的数据包进行传输。必须把这个包完整的接受才能进行读取。

      SOCK_RAW socket:类型提供单一的网络访问,这个socket类型使用ICMP公共协议。(ping、traceroute使用该协议)

      SOCK_RDM:这个类型是很少使用的,在大部分的操作系统上没有实现,它是提供给数据链路层使用,不保证数据包的顺序。

  • protocol:表示协议号。

    • protocol = 0,表示 默认协议。
    • SOCK_STREAM 的默认协议是 TCP,SOCK_DGRAM 的默认协议是 UDP。
  • 返回值:成功,返回新创建的 socket 的文件描述符;失败,返回 -1,设置 errno。(errno 可以使用 perror 打印)

4.2 bind 函数

绑定 IP + port 和 socket 文件描述符。使 sockfd 文件描述符监听 addr 所描述的地址和端口号。

1
2
3
4
5
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h> // struct sockaddr_in 所在头文件

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
  • sockfd:

  • addr:sockaddr 结构体,存的是 IP + port。

  • addrlen:长度,sizeof(addr),addr 可以有多种协议格式,所以需要指定结构体长度。

4.3 listen 函数

用来声明sockfd处于监听状态,并指定 同时链接 的个数上限。【不是指定链接上限数】

例如:最多只能有 400 个客户端和我同时建立链接。也就是说,处于三次握手过程中的链接数最多是 400 个。

1
2
3
4
#include <sys/types.h>
#include <sys/socket.h>

int listen(int sockfd, int backlog);
  • sockfd:socket 文件描述符。

  • backlog:处于三次握手过程中的链接数最。

  • 返回值:成功返回 0,失败返回 -1。

查看系统默认 backlog:

1
cat /proc/sys/net/ipv4/tcp_max_syn_backlog

4.4 accept 函数

三方握手完成后,服务器 调用 accept() 接受连接。

如果服务器调用 accept() 时还没有客户端的连接请求,就阻塞等待直到有客户端连接上来。

1
2
3
#include <sys/types.h>
#include <sys/socket.h>
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
  • sockfd:socket 文件描述符。

  • addr:传出参数(value-result argumen),返回链接客户端的地址信息(IP + port)

  • addrlen:传入传出参数。【注意是指针 socklen_t*,bind 和 connect 函数的参数不是指针 socklen_t】

    • 传入:addr 还没发生改变时,传入 sizeof(addr),表示传出参数容器的大小。
    • 传出:函数执行后,addr 里面有了真正的数据,传出 sizeof(addr),表示真正接收到的 addr 的大小。
  • 返回值:成功,返回一个的 socket 文件描述符,用于和客户端通信。失败返回 -1,设置 errno。

4.5 connect 函数

客户端 调用 connect() 连接服务器。

connect 和 bind 的参数形式一致,区别在于 bind 的参数是自己的地址,而 connect 的参数是对方的地址。

1
2
3
4
#include <sys/types.h>
#include <sys/socket.h>

int connect(int sockfd, struct sockaddr *addr, socklen_t addrlen);
  • sockfd:socke 文件描述符。

  • addr:传入参数,指定服务器端的信息,IP + port。

  • addrlen:传入参数,sizeof(addr)。

  • 返回值:成功返回 0,失败返回 -1,设置 neero。

5.socket 实例

5.1 CS模型的流程

客户端发送小写字母;服务端收到后,转换为大写字母,并回传给客户端。

5.2 代码实现

server.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#define SERVER_PORT 6666
int main() {
// std::cout << "1.create..." << std::endl;
int lfd = socket(AF_INET, SOCK_STREAM, 0);

sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

// std::cout << "2.bind..." << std::endl;
bind(lfd, (sockaddr *)&server_addr, sizeof(server_addr));
listen(lfd, 32); // 默认 128

// std::cout << "3.accept..." << std::endl;
sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
bzero(&client_addr, len);

int cfd = accept(lfd, (sockaddr *)&client_addr, &len);
if (cfd == -1) {
std::cout << "fail" << std::endl;
}


// std::cout << "4.put info..." << std::endl;
char client_buf[BUFSIZ];
inet_ntop(AF_INET, &client_addr.sin_addr, client_buf, sizeof(client_buf));
int client_port = ntohs(client_addr.sin_port);
std::cout << "Client IP: " << client_buf << "Client port: " << client_port << std::endl;

// std::cout << "5.change..." << std::endl;
while (1) {
char buf[BUFSIZ];
int n = read(cfd, buf, sizeof(buf));

for (int i = 0; i < n; i++) {
buf[i] = toupper(buf[i]); // 转化为大写
}
write(cfd, buf, n);
}

// std::cout << "6.end..." << std::endl;
close(lfd);
return 0;
}

client.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#define SERVER_IP "127.0.0.1"
#define SERVER_PORT 6666

int main() {
// std::cout << "1.create..." << std::endl;
int cfd = socket(AF_INET, SOCK_STREAM, 0);

sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));

server_addr.sin_family = AF_INET;
inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr);
server_addr.sin_port = htons(SERVER_PORT);

// std::cout << "2.connect..." << std::endl;
int ret = connect(cfd, (sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1) {
std::cout << "fail" << std::endl;
}

// std::cout << "3.change..." << std::endl;
while (1) {
char buf[BUFSIZ];
fgets(buf, sizeof(buf), stdin);
std::cout << "buf = " << buf << std::endl;

write(cfd, buf, strlen(buf));

int n = read(cfd, buf, sizeof(buf));
std::cout << "Response from server: " << std::endl;
write(STDOUT_FILENO, buf, n);
std::cout << std::endl;
}

// std::cout << "4.end..." << std::endl;
close(cfd);
return 0;
}

5.3 遇到的问题

  • inet_pton 函数传出参数不是 &server_addr.sin_addr.s_addr,而是 &server_addr.sin_addr

  • server 的 IP 一定不能错。

  • 如果不使用 toupper 函数,自己实现的话,注意输入非法字符容易报错(中文、数字、大写字母)

  • 重要问题】先关闭 server 后,四次挥手,server 进入 TIME_WAIT 状态,此时端口被占用,无法再次 connect。

    检查端口 6666 是否占用。

    1
    netstat -apn | grep 6666
  • connect error: No route to host. 链接失败,没有主机路由。需要关闭服务器防火墙测试。

5.4 错误处理函数

  • 什么是错误处理函数?

    就是对 socket、bind、accept、connec 等函数进行二次封装,将返回值判断封装其中。

  • 为什么要写错误处理函数?

    对于 socket、bind、accept、connect 都应该判断返回值。但是为了保证主程序的逻辑清晰,将判断返回值的步骤放在错误处理函数中。

  • 为什么封装后的函数名要大写?

    socket 封装成 Socket,这样在 vim 中按 shift + k 依然可以打开 man 文档,因为打开 man 文档不区分函数名大小写。

wrap.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include "include.hpp"

#ifndef __WRAP_H_
#define __WRAP_H_
void perror_exit(const char *s);
int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr);
void Bind(int fd, const struct sockaddr *sa, socklen_t salen);
void Connect(int fd, const struct sockaddr *sa, socklen_t salen);
void Listen(int fd, int backlog);
int Socket(int family, int type, int protocol);
ssize_t Read(int fd, void *ptr, size_t nbytes);
ssize_t Write(int fd, const void *ptr, size_t nbytes);
void Close(int fd);
ssize_t Readn(int fd, void *vptr, size_t n);
ssize_t Writen(int fd, const void *vptr, size_t n);
static ssize_t my_read(int fd, char *ptr);
ssize_t Readline(int fd, void *vptr, size_t maxlen);
#endif

wrap.cc

【注意】if ( (n = accept(fd, sa, salenptr)) < 0) { ,一定要将 n = accept 用括号括起来,因为 < 的优先级高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#include "wrap.hpp"

void perror_exit(const char *s) {
perror(s);
exit(-1);
}

int Accept(int fd, sockaddr *sa, socklen_t *salenptr) {

// 调用成功,需要返回一个新的文件描述符,用于与客户端通信。
int n;

again:
if ( (n = accept(fd, sa, salenptr)) < 0) {

// ECONNABORTED:异常断开
// EINTR:慢系统调用被信号中断
if ((errno == ECONNABORTED) || (errno == EINTR)) {
goto again; // 要么重启,要么执行信号的默认动作。这里选择重启。
} else {
perror_exit("accept error");
}
}
return n;
}

void Bind(int fd, const sockaddr *sa, socklen_t salen) {
if (bind(fd, sa, salen) < 0) {
perror_exit("bind error");
}
}

void Connect(int fd, const sockaddr *sa, socklen_t salen) {
if (connect(fd, sa, salen) < 0) {
perror_exit("connect error");
}
}

void Listen(int fd, int backlog) {
if (listen(fd, backlog) < 0) {
perror_exit("listen error");
}
}

int Socket(int family, int type, int protocol) {

// 因为 socket 需要返回文件描述符,所以封装的 Socket 函数需要有返回值。
int n;
if ( (n = socket(family, type, protocol)) < 0) {
perror_exit("socket error");
}
return n;
}

ssize_t Read(int fd, void *ptr, size_t nbytes) {
ssize_t n;

again:
if ( (n = read(fd, ptr, nbytes)) == -1) {
if (errno == EINTR) {
goto again;
} else {
return -1;
}
}
return n;
}

ssize_t Write(int fd, const void *ptr, size_t nbytes) {
ssize_t n;

again:
if ( (n = write(fd, ptr, nbytes)) == -1) {
if (errno == EINTR) {
goto again;
} else {
return -1;
}
}
return n;
}

void Close(int fd) {
if (close(fd) == -1) {
perror_exit("close error");
}
}

ssize_t Readn(int fd, void *vptr, size_t n) {
size_t nleft; // 剩余未读取的字节数
ssize_t nread; // 实际读到的值
char *ptr;

ptr = (char *)vptr; // 读写指针的位置
nleft = n;

while (nleft > 0) {
if ( (nread = read(fd, ptr, nleft)) < 0) {
if (errno == EINTR)
nread = 0;
else
return -1;
} else if (nread == 0) // 读完了
break;

nleft -= nread;
ptr += nread;
}
return n - nleft;
}


ssize_t Writen(int fd, const void *vptr, size_t n) {

size_t nleft;
ssize_t nwritten;
const char *ptr;

ptr = (char *)vptr;
nleft = n;
while (nleft > 0) {
if ( (nwritten = write(fd, ptr, nleft)) <= 0) {
if (nwritten < 0 && errno == EINTR)
nwritten = 0;
else
return -1;
}

nleft -= nwritten;
ptr += nwritten;
}
return n;
}

// Readline 的辅助函数
static ssize_t my_read(int fd, char *ptr) {
static int read_cnt; // 读到的数量
static char *read_ptr;
static char read_buf[100]; // 读到的字符串

if (read_cnt <= 0) {
again:
if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) {
if (errno == EINTR)
goto again;
return -1;
} else if (read_cnt == 0)
return 0;
read_ptr = read_buf;
}
read_cnt--;
*ptr = *read_ptr++;
return 1;
}

// 每次读一行,替代 fgets
ssize_t Readline(int fd, void *vptr, size_t maxlen)
{
ssize_t n, rc;
char c, *ptr;

ptr = (char *)vptr;
for (n = 1; n < maxlen; n++) {
if ( (rc = my_read(fd, &c)) == 1) {
*ptr++ = c;
if (c == '\n') // 读到 \n 表示读了一行
break;
} else if (rc == 0) {
*ptr = 0;
return n - 1;
} else
return -1;
}
*ptr = 0;
return n;
}

6.多进程并发服务器

6.1 思路

每一个客户端,都由一个子进程负责通信。

Server–父进程:

  1. 监听连接部分:

    • 创建 socket,将监听端口绑定到 lfd。
    • 等待 Client 连接,并返回用于通信的 cfd。
  2. 管理子进程部分:

    • 一旦有新 Client 连接,则创建子进程,让子进程负责通信。
    • 捕捉信号(子进程死亡 SIGCHLD),回收子进程。

Server–子进程:

  • 与客户端通信。

Client:

  • 创建 socket,连接服务器。
  • 发送请求。

6.2 代码实现

server.cc

要注意的地方:

  • 父进程不负责通信,所以必须关掉 cfd,以防文件描述符不够用。
  • 子进程不负责连接和创建子进程,需要退出循环并关掉 lfd。
  • 子进程通信时,如果客户端关闭写端,此时 read 的返回值 n == 0,应及时结束子进程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
void wait_child(int signo) {
pid_t wait_pid = 0;
while ((wait_pid = waitpid(0, NULL, WNOHANG)) > 0) {
std::cout << "child_pid = " << wait_pid << " is end!" << std::endl;
}
}

#define SERVER_PORT 8888
int main() {
//1.创建 socket
int lfd = Socket(AF_INET, SOCK_STREAM, 0);

//2.绑定 IP+port
sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// inet_pton(AF_INET, "172.16.103.5", &server_addr.sin_addr);
Bind(lfd, (sockaddr*)&server_addr, sizeof(server_addr));

//3.设置最大同时连接数
Listen(lfd, 128);

int cfd;
pid_t pid;
while (1) {

//4.父进程等待客户端连接
sockaddr_in client_addr;
bzero(&client_addr, sizeof(client_addr));
socklen_t client_addr_len = sizeof(client_addr);
cfd = Accept(lfd, (sockaddr*)&client_addr, &client_addr_len);
char client_buf[BUFSIZ];

// 输出客户端信息
inet_ntop(AF_INET, &client_addr.sin_addr, client_buf, sizeof(client_buf));
int client_port = ntohs(client_addr.sin_port);
std::cout << "Client IP: " << client_buf << "Client port: " << client_port << std::endl;

//5.创建子进程
pid = fork();
if (pid < 0) {
perror_exit("fork error");
} else if (pid == 0) {
close(lfd); // 子进程不负责连接,推出“连接-创建子进程”的循环。
break;
} else {
close (cfd); // 父进程不负责通信,回收子进程
signal(SIGCHLD, wait_child);
}
}

// 6.子进程通信
if (pid == 0) {
while (1) {
char read_buf[BUFSIZ] = {0};
int n = Read(cfd, read_buf, sizeof(read_buf));
if (n == 0) { // client 关闭写端
close(cfd);
return 0;
} else { // 处理请求
for (int i = 0; i < n; i++) {
read_buf[i] = toupper(read_buf[i]);
}
Write(cfd, read_buf, n);
}
}
}
return 0;
}

client.cc

5.2 中的 client 相同。

因为客户端的逻辑并没有改变,还是连接服务器 + 发送请求。

7.多线程并发服务器

7.1 思路

和多线程并发思路一致,只是将进程换成线程。

每一个客户端,都由一个子线程负责通信。

Server–父线程:

  1. 监听连接部分:

    • 创建 socket,将监听端口绑定到 lfd。
    • 等待 Client 连接,并返回用于通信的 cfd。
  2. 管理子线程部分:

    • 一旦有新 Client 连接,则创建子线程,让子线程负责通信。
    • 并设置线程分离。

Server–子线程:

  • 调用回调函数 communicate,与客户端通信。

Client:

  • 创建 socket,连接服务器。
  • 发送请求。

7.2 代码实现

server.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
struct socket_info {   // 构建一个合理的结构体,既能传递 client_addr, 也能传递 cfd
sockaddr_in client_addr;
int cfd;
};

void *communicate(void *arg) { // 线程的回调函数,返回值和参数都是 void *
socket_info *s_info = (socket_info *)arg;

// 输出客户端信息
char client_buf[BUFSIZ] = {0};
inet_ntop(AF_INET, &s_info->client_addr.sin_addr, // -> 的优先级大于 &
client_buf, sizeof(client_buf));
int client_port = ntohs(s_info->client_addr.sin_port);
std::cout << "Client IP: " << client_buf << " Client port: " << client_port << std::endl;

// 子线程通信
while (1) {
char read_buf[BUFSIZ] = {0};
int n = Read(s_info->cfd, read_buf, sizeof(read_buf));
if (n == 0) { // client 关闭了写端
close(s_info->cfd);
break;
} else { // 处理请求
for (int i = 0; i < n; i++) {
read_buf[i] = toupper(read_buf[i]);
}
Write(s_info->cfd, read_buf, n);
}
}
close(s_info->cfd);
return (void *)0;
}


#define SERVER_PORT 8888
int main() {
//1.创建 socket
int lfd = Socket(AF_INET, SOCK_STREAM, 0);

//2.绑定 IP+port
sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// inet_pton(AF_INET, "172.16.103.5", &server_addr.sin_addr);
Bind(lfd, (sockaddr*)&server_addr, sizeof(server_addr));

//3.设置最大同时连接数
Listen(lfd, 128);

int cfd;
pid_t pid;
socket_info s_info[256]; // 用数组存每一个连接的 client_addr + cfd
int i = 0;
pthread_t pth_id;
while (1) {

//4.父线程等待客户端连接
socklen_t client_addr_len = sizeof(s_info[i].client_addr);
bzero(&s_info[i].client_addr, client_addr_len);
s_info[i].cfd = Accept(lfd, (sockaddr*)&s_info[i].client_addr, &client_addr_len);

//5. 创建子线程来处理通信
pthread_create(&pth_id, NULL, communicate, (void *)&s_info[i]); // 子线程调用回调函数 communicate
pthread_detach(pth_id); // 父子线程分离,子线程自动回收自己的 PCB
i++; // 标记第几个线程,以便存入 client_addr + cfd
}
return 0;
}

client.cc

5.2 中的 client 相同。

因为客户端的逻辑并没有改变,还是连接服务器 + 发送请求。

8.socket 与 TCP

8.1 TCP 状态转换

详见 计算机网络笔记

8.2 socket 与 TCP 对应

8.3 TIME_WAIT 状态

先关闭 server 后,四次挥手,server 进入 TIME_WAIT 状态,此时端口被占用,无法再次 connect。

检查端口 6666 是否占用。

1
netstat -apn | grep 6666

8.4 FIN_WAIT_2 状态

可以使用 API 实现半关闭状态。

1
2
#include <sys/stocket.h>
int shutdown(int sockfd, int how);
  • sockfd:要关闭的文件描述符。
  • how:关闭的方式。
    • SHUT_RD(0):关闭sockfd上的读功能,该套接字不再接收数据,任何当前在套接字接受缓冲区的数据将被无声的丢弃掉。
    • SHUT_WR(1):关闭sockfd的写功能,进程不能在对此套接字发出写操作。
    • SHUT_RDWR(2):关闭sockfd的读写功能。相当于调用shutdown两次:首先是 SHUT_RD,然后是SHUT_WR。

PS:shutdown 和 close 的区别

  • close 中止连接,减少描述符的引用计数,并不直接关闭连接,当描述符引用计数为 0 时,才关闭连接。

  • shutdown不考虑描述符的引用计数,比如关闭读端,任何进程都无法从该 socket 接收数据。

举个例子:

  • 多个进程共享一个套接字,close 被调用一次,计数减一,直到所有进程均调用 close,套接字才被释放。(套接字就是内核中的两个缓冲区)
  • 在多进程中,如果一个进程调用了shutdown(sfd, SHUT_RDWR)后,其它的进程将无法进行通信。但如果一个进程close(sfd)将不会影响到其它进程。

8.5 端口复用

函数原型:

该函数用法极其复杂,记住咱们设置端口复用即可。

1
2
3
4
#include <sys/types.h>
#include <sys/socket.h>

int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
  • sockfd:套接字文件描述符
  • level:级别
  • optname:选项名
  • optval:选项值
  • optlen:选项的字节长度

设置端口复用:

在 bind 之前,设置。

1
2
int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
  • SOL_SOCKET:级别

  • SO_REUSEADDR:允许重用本地端口

9.多路 IO 转接服务器

也叫多任务 IO 服务器。

9.1 思想

不再由应用程序自己监听客户端连接,取而代之由内核替应用程序监视文件。

下面主要介绍三种方式:

  • **select:**返回监听到的事件的数量。
  • **poll:**返回监听到的事件的数量 + 监听的文件描述符数组。
  • **epoll:**返回监听到的事件的数量 + 监听的文件描述符红黑树。

以 select 监听为例(poll、epoll 同理):

  • lfd 请求建立连接。

    举个例子:select 发现 lfd 有连接请求,会让 server 调用 accept 函数,和客户端建立建立连接。(因为是有连接请求后,才调用的 accept 函数,所以 accept 不会阻塞。)

  • cfd 读、写、异常事件。

    举个例子:当 select 监听到有 cfd_d 有可读事件时,会让 server 调用 read 函数,读取 cfd_d 中的数据。(因为是 cfd 中有数据后,才调用的 read 函数,所以 read 不会阻塞。)

9.2 问题

虽然不需要服务器程序自己监听,但是连接客户端、处理请求都需要服务器程序来处理。

**问题:**一旦某个客户端的请求处理的时间过长,其他的客户端就无法及时连接。

**解决:**线程池。

10.select

10.1 函数原型

1
2
3
4
5
6
7
8
9
10
11
12
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);

struct timeval {
long tv_sec; // seconds
long tv_usec; // microseconds
};
  • nfds:所监听的所有文件描述符中,最大的文件描述符 + 1。

    意义:此参数会告诉内核检测前多少个文件描述符的状态。

    比如:进程打开了序号为 0 - 400 的文件描述符,nfds 则为 401。

  • fd_set:位图,用二进制来标记文件描述符是否在集合中。

    • readfds:监听文件描述符集合 readfds 中,是否有可读事件。传入传出参数。

    • writefds:监听文件描述符集合 writefds 中,是否有可写事件。传入传出参数。

    • exceptfds:监听文件描述符集合 exceptfds 中,是否有异常事件。传入传出参数。

    【传入传出】举个例子

    • 调用 select 之前,我们想监听 4、5 文件描述符是否有可读事件。

      readfds 集合中,4、5 文件描述符对应的值为 1,其他的为 0。

    • 调用 select 之后,只有 4 文件描述符有可读事件。

      readfds 集合中,4 文件描述符对应的值为 1,其他的为 0。

  • timeout:内核监听的时间。

    • NULL,永远等下去
    • 设置 timeval,等待固定时间
  • 设置 timeval 里时间均为 0,检查描述符后立即返回,轮询

  • 返回值

    • 成功,返回所监听的所有监听集合中,满足条件的总数。

      举个例子:

      r:3,4。可读事件:3,4

      w:4,5,6。可写事件:4,5

      e:4,5,7,8。异常事件:4,7

      select 返回 6。

    • 失败,则返回 -1,设置 errno。

10.2 设置 fd_set

类似设置未决信号集、阻塞信号集(类型 sigset_t)

【全部清除】将 set 所有位置 0。

1
void FD_ZERO(fd_set *set); 

【清除 fd】将 set 中对应的 fd 置 0。

1
void FD_CLR(int fd, fd_set *set);

【添加 fd】将 set 中对应的 fd 置 1。

1
void FD_SET(int fd, fd_set *set); 

【判断 fd 是否在 set 中】判读 set 中对应的 fd 是否为 1。

1
int FD_ISSET(int fd, fd_set *set);
  • 返回值:在 set 中返回 1,不在 set 中返回 0。

10.3 select 的局限性

  • 文件描述符上限:1024。

    也就是说同时监听文件描述符 1024 个。

  • select 返回值只返回总数,如果需要知道具体的哪一个文件描述符,需要 for 循环判断 0 - 1023 个文件描述符。

    **解决方案:**自定义结构体(数组),将要监听的文件描述符加入数组。for 循环时,只循环判断数组中的文件描述符。

  • 监听集合、满足监听条件的集合是同一个集合。

    所以需要保存原有的监听集合,否则返回后监听集合就被破坏了。

10.4 思路

储存文件描述符和返回结果的思路。

传入三个位图,传出三个位图,在传出的三个位图中,寻找满足条件的事件进行处理。

10.5 代码实现

为什么自维护数组可以减少便利次数?

  • 无自维护监听数组:

    需要从 0-1024 判断文件描述符是否在 返回的结果集合 rset 中。

  • 有自维护的监听数组:

    只需要判断 监听数组client 中的文件描述符是否在 返回的结果集合 rset 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#define SERVER_PORT 8888
int main() {
//【1】创建 socket ==========
int lfd = Socket(AF_INET, SOCK_STREAM, 0);

// 设置端口复用
int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

//【2】绑定 IP+port ==========
sockaddr_in server_addr;
socklen_t server_addr_len = sizeof(server_addr);

// 初始化 server_addr
bzero(&server_addr, server_addr_len);
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// inet_pton(AF_INET, "127.0.0.1", &server_addr.sin_addr);
server_addr.sin_port = htons(SERVER_PORT);

Bind(lfd, (sockaddr*)&server_addr, server_addr_len);

//【3】设置同时连接上限 ==========
Listen(lfd, 128);

int max_fd = lfd; // 计算最大的文件描述符,(select 第一个参数会用到)

// 设置自定义的 客户端的 监听集合,用于 FD_ISSET 的遍历判断。不然只能遍历从 0 - 1023
int client[FD_SETSIZE]; // 每个客户端的 cfd 的值都会存入 client 数组
int max_i = -1; // 记录数组的最后一个元素的索引,FD_ISSET 只需要遍历 0 - max_i
for (int i = 0; i < FD_SETSIZE; i++) {
client[i] = -1; // 初始化数组,全部都是空位,用 -1 表示
}

// 设置监听的集合
fd_set allset;
FD_ZERO(&allset);
FD_SET(lfd, &allset);
while (1) {

// 开始监听
fd_set rset = allset;
int n_ready = select(max_fd + 1, &rset, NULL, NULL, NULL);
if (n_ready < 0) {
perror_exit("select error");
}

// 如果三个客户端同时连接, n_ready 还是等于 1, 返回值只看满足要求的 文件描述符 个数(只有一个 lfd)。
std::cout << "n_ready = " << n_ready << std::endl;

// 监听到 lfd 有连接请求:将新的 cfd 加入 select 监听集 + 自定义监听集中。
if (FD_ISSET(lfd, &rset)) {

// 定义 client_addr
sockaddr_in client_addr;
socklen_t client_addr_size = sizeof(client_addr);
bzero(&client_addr, client_addr_size);

//【4】连接客户端
int cfd = Accept(lfd, (sockaddr *)&client_addr, &client_addr_size);

// 打印客户端信息
char client_ip[BUFSIZ] = {0};
inet_ntop(AF_INET, &client_addr.sin_addr, client_ip, sizeof(client_ip));
int client_port = ntohs(client_addr.sin_port);
std::cout << "Client IP: " << client_ip << "Client port: " << client_port << std::endl;

// 将新的 cfd 加入到自己维护的监听数组(监听文件符号集)
int i = 0;
for (i = 0; i < FD_SETSIZE; i++) {
if (client[i] < 0) { // 找到 client 没有使用的地方(空位)
client[i] = cfd; // 将新的 cfd 加入 client 数组
break;
}
}

// 防止监控文件达到上限
if (i == FD_SETSIZE) {
std::cout << "too many client" << std::endl;
exit(1);
}

// 将新 cfd 加入监听文件描述符(select 第二个参数会用到)
FD_SET(cfd, &allset);

// 保证 max_fd 是最大的文件描述符 (select 第一个参数)
if (max_fd < cfd) {
max_fd = cfd;
}

// 保证 max_i 是 client 数组中最后一个元素的索引
if (i > max_i) {
max_i = i;
}

// n_ready == 1, 表示只有一个满足条件的事件,也就是 lfd,没有 cfd 满足条件,故直接进入下次循环。
if (-- n_ready == 0) {
continue;
}
}

// 判断满足条件的事件,是哪一个 cfd
for (int i = 0; i <= max_i; i++) {
int sockfd = client[i];

// client[i] = -1 表示该位置是空位。
if (sockfd < 0) {
continue;
}

// 如果该 cfd 满足条件,读该 cfd
if (FD_ISSET(sockfd, &rset)) {
int n;
char read_buf[BUFSIZ] = {0};

// 如果客户端关闭了文件描述符:我也关闭文件描述符 + 更改 select 监听集 + 更改自定义监听集
// 【重要】当客户端关闭了文件描述符,select 也会监听到该 cfd 的可读事件
if ( (n = Read(sockfd, read_buf, sizeof(read_buf))) == 0 ) {
close(sockfd);
FD_CLR(sockfd, &allset);
client[i] = -1;
} else if (n > 0) { // 如果读到内容,处理请求,响应请求
for (int j = 0; j < n; j++) {
read_buf[j] = toupper(read_buf[j]);
}
Write(sockfd, read_buf, n);
}

// 如果还没有满足条件的 cfd,就退出循环,获得下次监听的结果 n_ready
if (--n_ready == 0) {
break;
}
}
}
}
return 0;
}

11.poll

10.1 函数原型

1
2
3
4
5
6
7
8
9
#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
int fd;
short events;
short revents;
};
  • struct pollfd *fds:结构体 pollfd 数组的首地址。

struct pollfd 结构体

  • fd:文件描述符
  • events:监控的事件
  • revents:监控事件中满足条件返回的事件,传出参数,初始化为 0 即可。
  • 事件:POLLIN / POLLOUT / POLLERR(读 / 写 / 异常)

10.2 poll 的优势

和 select 相比。

  • 修改配置文件 file descriptors 的值,可以改变文件描述符 1024 的上限。

    查看文件描述符个数:ulimit -a 或者 ulimit -n

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # 查看硬件所限制的最多打开的文件描述符个数
    cat //proc/sys/fs/file-max

    # 修改文件描述符的上限值
    vi /etc/security/limits.conf

    # 在文件末尾加入以下两行
    # soft 软限制(下限)、hard 硬限制(上限)。
    * soft nofile 65535
    * hard nofile 10000
  • 分离了监听集合和返回集合

    在 select 中,文件描述符集合是传入传出参数。

  • FD_ISSET 搜索的范围变小。

    select 需要从 0 - 1024 遍历。(解决方案:自己维护一个用于遍历监听文件描述符的数组)

    poll 监听的文件描述符在 pollfd 结构体数组中,直接遍历 pollfd 结构体数组。

10.3 思路

储存文件描述符和返回结果的思路。

用结构体数组储存 需要监听的文件描述符、需要监听的事件、监听到的事件。

10.4 代码实现

server.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include "include.hpp"
#include "wrap.hpp"

#define SERVER_PORT 8888
#define OPEN_MAX 1024
int main() {
// 1.创建 socket =============
int lfd = Socket(AF_INET, SOCK_STREAM, 0);

// 设置端口复用
int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 2.绑定 IP+Port =============
// 设置 serve_addr
sockaddr_in server_addr;
socklen_t server_addr_len = sizeof(server_addr);
bzero(&server_addr, server_addr_len);
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

// bind
Bind(lfd, (sockaddr *)&server_addr, server_addr_len);

// 3.设置同时连接上限 =============
Listen(lfd, 128);

// 创建客户端数组,记录需要监听的文件描述符
pollfd client[OPEN_MAX];

// 初始化客户端数组
for (int i = 0; i < OPEN_MAX; i++) {
client[i].fd = -1; // 标记为 -1 则是无效的文件描述符,无需监听
}
int maxi = 0; // client 数组中最大的有效元素下标

// 将 lfd 加入客户端数组
client[0].fd = lfd;
client[0].events = POLLIN; // 该文件描述符需要监听的事件
maxi = 1;

while (1) {
int n_ready = poll(client, maxi + 1, -1);
std::cout << "n_ready = " << n_ready <<std::endl;
if (client[0].revents & POLLIN) {
// 初始化 client_addr
sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
bzero(&client_addr, client_addr_len);
// std::cout << "初始化 client_addr" <<std::endl;
// 连接,并返回客户端描述符
int cfd = Accept(lfd, (sockaddr *)&client_addr, &client_addr_len);
char IP[BUFSIZ] = {0};
std::cout << "IP: " << inet_ntop(AF_INET, &client_addr.sin_addr, IP, sizeof(IP))
<< " Port: " << ntohs(client_addr.sin_port) << std::endl;

// 将 cfd 加入监听数组, 并设置监听事件
int i = 0;
for (i = 0; i < OPEN_MAX; i++) {
if (client[i].fd < 0) {
client[i].fd = cfd;
client[i].events = POLLIN;
std::cout << "i = " << i <<std::endl;
break;
}
}
if (i == OPEN_MAX) {
perror_exit("Too many client");
}
if (maxi < i) {
maxi = i; // 更新最大有效描述符
}

if (--n_ready <= 0) { // 没有其他的就绪事件,就继续回到 poll 监听阻塞
continue;
}
}

// 除了 lfd,其他 client_fd 监听到了事件
for (int i = 1; i <= maxi; i++) {

// 当前 client_fd = -1 表示未在监听。
if (client[i].fd < 0) {
continue;
}

std::cout << "11 i = " << i <<std::endl;

// 监听到读事件
if (client[i].revents & POLLIN) {
char read_buf[BUFSIZ] = {0};
int n = 0;

// n = -1 表示服务器收到 RST 标志
if ( (n = Read(client[i].fd, read_buf, sizeof(read_buf))) < 0 ) {
if (errno == ECONNRESET) { // 服务器收到 RST 标志,errno 会被设为 ECONNRESET。
std::cout << "client[" << i << "] aborted connection." << std::endl;
Close(client[i].fd);
client[i].fd = -1;
} else {
perror_exit("read error");
}
} else if (n == 0) { // n = 0 表示客户端先关闭了链接
std::cout << "client[" << i << "] closed connection." << std::endl;
Close(client[i].fd);
client[i].fd = -1;
} else { // n > 0,开始处理请求
for (int j = 0; j < n; j++) {
read_buf[j] = toupper(read_buf[j]);
}
Write(client[i].fd, read_buf, n);
}

if (--n_ready <= 0) { // 没有其他的就绪事件,就继续回到 poll 监听阻塞。
break;
}
}
}
}
return 0;
}

12.epoll

12.1 函数原型

(1)创建一个 epoll 句柄

1
2
3
#include <sys/epoll.h>

int epoll_create(int size);
  • size:表示内核监听文件描述符的个数,和内存大小有关。(size 只是建议值)
  • 返回值:返回文件描述符 epfd,指向一棵红黑树的根节点。

(2)控制 epoll 监控的某个文件描述符上的事件:注册、修改、删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

// 联合体
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
  • epfd:epoll_create 函数返回的红黑树根节点。

  • op:表示动作,用三个宏表示 注册、修改、删除。

    • EPOLL_CTL_ADD (注册新的fd到epfd)

    • EPOLL_CTL_MOD (修改已经注册的fd的监听事件)

    • EPOLL_CTL_DEL (从epfd删除一个fd)

  • fd:所监听的文件描述符。

  • event:结构体,两个变量:events 和 data。【传入参数】

    • events:指定需要监听的事件。

      EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭)

      EPOLLOUT:表示对应的文件描述符可以写

    EPOLLERR:表示对应的文件描述符发生错误

    • data:是个联合体,使用 int fd 即可。(反应堆模型会使用 void *ptr)

      此处 fd 与 epoll_ctl 第三个参数 fd 数据一致。此处的 fd 和 events 作为一个结构体,返回给结构体数组(数组中记录监听到的事件和文件描述符)。

  • 返回值:成功:0;失败:-1,设置相应的 errno。

(3)监听

1
2
3
#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • epfd:同 epoll_ctl 第一个参数 epfd,红黑树的根节点。

  • events:与 epoll_ctl 最后一个参数不同,此处表示 event 的数组。【传出参数】

  • maxevents:events 数组容量。

  • timeout:

    -1:阻塞。

    0:立即返回,非阻塞。

    >0:指定毫秒。

  • 返回值:成功返回有多少文件描述符有监听到事件。时间到时返回 0,出错返回 -1。

12.2 特点

  • epoll 能显著提高 程序在大量并发连接中只有少量活跃 的情况下的系统CPU利用率。

    因为 epoll 会复用文件描述符集合来传递结果,而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

  • epoll 除了提供 select/poll 那种 IO 事件的水平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存 IO 状态,减少 epoll_wait/epoll_pwait 的调用,提高应用程序效率。

  • epoll 是 Linux 大规模并发网络程序中的热门首选模型。

  • 与 poll 相同,通过 /etc/security/limits.conf 更改文件描述符个数。

12.3 思路

储存文件描述符和返回结果的思路。

  • 使用 epoll_ctl 将需要监听的文件描述符加入红黑树 epfd 中。
  • 监听到满足的事件后,将属于该文件描述符的结构体拷贝到结构体数组 fd_event_arr 中,作为返回结果返回。

12.4 代码实现

和 poll 实现流程相似,poll 的监听结构体数组被 epoll 的监听红黑树代替。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include "include.hpp"
#include "wrap.hpp"

#define SERVER_PORT 8888
#define OPEN_MAX 1024
int main() {

// 1.创建 socket =============
int lfd = Socket(AF_INET, SOCK_STREAM, 0);

// 设置端口复用
int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 2.绑定 IP+Port =============
// 设置 serve_addr
sockaddr_in server_addr;
socklen_t server_addr_len = sizeof(server_addr);
bzero(&server_addr, server_addr_len);
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

// bind
Bind(lfd, (sockaddr *)&server_addr, server_addr_len);

// 3.设置同时连接上限 =============
Listen(lfd, 128);

// (1)创建 epoll 模型,epfd 指向红黑树的根节点
int epfd = epoll_create(OPEN_MAX);
if (epfd == -1) {
perror_exit("epoll_create error");
}

// 指定监听 lfd 的读事件,为 epoll_ctl 的第四个参数做准备
epoll_event fd_event;
fd_event.events = EPOLLIN;
fd_event.data.fd = lfd;

// 讲 lfd 添加在 epoll 所监听二叉树上
int res = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &fd_event);
if (res == -1) {
perror_exit("epoll_ctl error");
}

// fd_event_arr 是用于返回监听结果的 epoll_event 结构体数组。
epoll_event fd_event_arr[OPEN_MAX];

while (1) {

// epoll 开始监听,-1 表示永久阻塞。
int nready = epoll_wait(epfd, fd_event_arr, OPEN_MAX, -1);
if (nready == -1) {
perror_exit("epoll_wait error");
}

for (int i = 0; i < nready; i++) {

if (!fd_event_arr[i].events & EPOLLIN) { // 如果不是读事件,继续。
continue;
}

if (fd_event_arr[i].data.fd == lfd) { // 如果是 lfd 的读事件,则连接客户端

// 连接客户端
sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
bzero(&client_addr, client_addr_len);
int cfd = Accept(lfd, (sockaddr *)&client_addr, &client_addr_len);

// 打印客户端信息
char ip_buf[1024] = {0};
inet_ntop(AF_INET, &client_addr.sin_addr, ip_buf, sizeof(ip_buf));
std::cout << "client connect successfully." << " IP: " << ip_buf << " Port: " << ntohs(client_addr.sin_port) << std::endl;

// 将 cfd 加入 epfd 中监听。
fd_event.events = EPOLLIN;
fd_event.data.fd = cfd;
int res = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &fd_event);
if (res == -1) {
perror_exit("epoll_ctl error");
}
} else { // 如果是 cfd 的读事件,则读数据,处理请求

// 读数据
int cfd = fd_event_arr[i].data.fd;
char read_buf[BUFSIZ];
int n = Read(cfd, read_buf, sizeof(read_buf));

if (n == 0) { // 读到 0, 说明客户端关闭了连接。

// 从 epfd 红黑树中,删除这个 cfd,停止监听。
int res = epoll_ctl(epfd, EPOLL_CTL_DEL, cfd, NULL);
if (res == -1) {
perror_exit("epoll_ctl error");
}

// 关闭与客户端的连接
Close(cfd);
std::cout << "client closed connection" << std::endl;
} else if (n < 0) { // 出错,不再监听该 cfd
perror("read n < 0 error");
int res = epoll_ctl(epfd, EPOLL_CTL_DEL, cfd, NULL);
Close(cfd);
} else { // 处理请求
for (int i = 0; i < n; i++) {
read_buf[i] = toupper(read_buf[i]);
}
Write(cfd, read_buf, n);
}
}
}
}
return 0;
}

12.5 水平触发和边沿触发

两种触发模式作用:减少对 epoll_wait 的调用次数。

(1)epoll 事件模型

epoll 事件模型有两种:

  • Edge Triggered(ET)边沿触发只有新的数据到来才触发,无论缓存区中是否还有数据。
  • Level Triggered(LT)水平触发只要缓冲区有数据都会触发。

举一个形象的例子:

边缘触发就像上升沿、下降沿,只有高低电平发生变化时,才会触发。

水平触发就像水平沿,只要是高电平都会触发。

(2)ET 模式

举个例子:

[1]:客户端写入 2KB 数据。

[2]:服务端调用 epoll_wait,返回 cfd 有可读事件。

[3]:服务端读取 1KB 数据。(readn 函数)

[4]:服务端调用 epoll_wait。

在 ET 模式中,此时尽管缓冲区还有 1KB 数据没有读取,如果客户端没有新的数据写给服务端,epoll_wait 会阻塞在这里。

【问题】一旦客户端在等待服务端对于这 2KB 的数据反馈,服务端在等待客户端发送新的数据。一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

【解决】使用非阻塞的函数。no-block。

ET是高速工作方式,只支持 no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。

然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个 fd 作 IO 操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。

(3)LT 模式

LT 是缺省的工作方式,并且同时支持 block 和 no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的 fd 进行 IO 操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的 select/poll 都是这种模型的代表。

(4)实现方式

epoll 的触发方式,通过结构体 struct epoll_event 中,成员变量 events 的监听事件来确定。

1
2
3
4
5
// 默认是水平触发 LT
event.events = EPOLLIN;

// 用以下语句表示边缘触发 ET
event.events = EPOLLIN | EPOLLET;

12.6 ET 模式的实例

epoll 也可以监听管道的文件描述符。

下面以管道为例,实现 ET 模式的 epoll 监听。


学习笔记|Socket 与并发服务器
https://www.aimtao.net/socket/
Posted on
2021-03-04
Licensed under