API
epoll_create
1 |
|
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
1 | lrwx------. 1 root root 64 5月 12 11:45 0 -> /dev/pts/0 |
- size:告诉内核这个监听的数目一共有多大。
epoll_ctl
1 |
|
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
- epfd:epoll_create()的返回值。
- op:动作,用三个宏来表示:
- EPOLL_CTL_ADD:注册新的fd到epfd中。
- EPOLL_CTL_MOD:修改已经注册的fd的监听事件。
- EPOLL_CTL_DEL:从epfd中删除一个fd。
- fd:需要监听的fd。
event:告诉内核需要监听什么事,struct epoll_event结构如下:
1
2
3
4struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};events可以是以下几个宏的集合:
- EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)。
- EPOLLOUT:表示对应的文件描述符可以写。
- EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)。
- EPOLLERR:表示对应的文件描述符发生错误。
- EPOLLHUP:表示对应的文件描述符被挂断。
- EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
- LT模式:默认为此模式,当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
- ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。
epoll_wait
1 |
|
等待事件的产生,类似于select()调用。
- epfd:epoll_create()的返回值。
- events:从内核得到事件的集合。
- maxevents:告诉内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size。
- timeout:超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
- 返回值:需要处理的事件数目,如返回0表示已超时,-1表示有错误发生。
特点
epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
Example
下面的程序是基于socket的tcp应答程序。
- 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
int main(int argc, const char * argv[])
{
char input_msg[BUFFER_SIZE];
char recv_msg[BUFFER_SIZE];
//本地地址
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
bzero(&(server_addr.sin_zero), 8);
//创建socket
int server_sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if(server_sock_fd == -1)
{
perror("socket error");
return 1;
}
//绑定socket
int bind_result = bind(server_sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if(bind_result == -1)
{
perror("bind error");
return 1;
}
//listen
if(listen(server_sock_fd, BACKLOG) == -1)
{
perror("listen error");
return 1;
}
//create epollfd
int events_len = CONCURRENT_MAX + 2;
int epollfd = epoll_create(events_len);
if(epollfd == -1)
{
fprintf(stderr, "create epollfd failed!\n");
exit(-1);
}
//clientfd
int clientfds[CONCURRENT_MAX];
for(int i = 0;i < CONCURRENT_MAX;i++)
{
clientfds[i] = -1;
}
//add event to kernel
struct epoll_event stdin_event;
stdin_event.events = EPOLLIN;
stdin_event.data.fd = STDIN_FILENO;
epoll_ctl(epollfd, EPOLL_CTL_ADD, STDIN_FILENO, &stdin_event);
struct epoll_event server_event;
server_event.events = EPOLLIN;
server_event.data.fd = server_sock_fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, server_sock_fd, &server_event);
//get events from kernel
int timeout = 20 * 1000;
struct epoll_event events[events_len];
//do epoll
while(1)
{
int ret = epoll_wait(epollfd, events, events_len, timeout);
if(ret < 0)
{
perror("epoll 出错\n");
continue;
}
else if(ret == 0)
{
printf("epoll 超时\n");
continue;
}
else
{
for(int i = 0;i < ret;i++)
{
int fd = events->data.fd;
if(fd == server_sock_fd && (events->events & server_event.events))
{
//有新的连接请求
struct sockaddr_in client_address;
socklen_t address_len;
int client_sock_fd = accept(server_sock_fd, (struct sockaddr *)&client_address, &address_len);
printf("new connection client_sock_fd = %d\n", client_sock_fd);
if(client_sock_fd > 0)
{
int index = -1;
for(int client_i = 0;client_i < CONCURRENT_MAX;client_i++)
{
if(clientfds[client_i] == -1)
{
index = client_i;
clientfds[client_i] = client_sock_fd;
// add event to kernel
struct epoll_event client_event;
client_event.events = EPOLLIN;
client_event.data.fd = client_sock_fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, client_sock_fd, &client_event);
break;
}
}
if(index >= 0)
{
printf("新客户端(%d)加入成功 %s:%d\n", index, inet_ntoa(client_address.sin_addr), ntohs(client_address.sin_port));
}
else
{
bzero(input_msg, BUFFER_SIZE);
strcpy(input_msg, "服务器加入的客户端数达到最大值,无法加入!\n");
send(client_sock_fd, input_msg, BUFFER_SIZE, 0);
printf("客户端连接数达到最大值,新客户端加入失败 %s:%d\n", inet_ntoa(client_address.sin_addr), ntohs(client_address.sin_port));
}
}
}
else if(fd == STDIN_FILENO && (events->events & stdin_event.events))
{
bzero(input_msg, BUFFER_SIZE);
fgets(input_msg, BUFFER_SIZE, stdin);
//输入“.quit"则退出服务器
if(strcmp(input_msg, QUIT_CMD) == 0)
{
exit(0);
}
for(int client_i = 0;client_i < CONCURRENT_MAX;client_i++)
{
if(clientfds[client_i] > 0)
{
printf("向客户端(%d)发送消息\n", client_i);
send(clientfds[client_i], input_msg, BUFFER_SIZE, 0);
}
}
}
else
{
//处理某个客户端过来的消息
bzero(recv_msg, BUFFER_SIZE);
long byte_num = recv(events[i].data.fd, recv_msg, BUFFER_SIZE, 0);
if(byte_num > 0)
{
if(byte_num > BUFFER_SIZE)
{
byte_num = BUFFER_SIZE;
}
recv_msg[byte_num] = '\0';
for(int client_i = 0;client_i < CONCURRENT_MAX;client_i++)
{
if(clientfds[client_i] == events[i].data.fd)
{
printf("客户端(%d):%s\n", client_i, recv_msg);
break;
}
}
}
else if(byte_num < 0)
{
for(int client_i = 0;client_i < CONCURRENT_MAX;client_i++)
{
if(clientfds[client_i] == events[i].data.fd)
{
printf("从客户端(%d)接受消息出错.\n", client_i);
break;
}
}
}
else
{
// delete event in kernel
struct epoll_event client_event;
client_event.events = EPOLLIN;
client_event.data.fd = events[i].data.fd;
epoll_ctl(epollfd, EPOLL_CTL_DEL, events[i].data.fd, &client_event);
for(int client_i = 0;client_i < CONCURRENT_MAX;client_i++)
{
if(clientfds[client_i] == events[i].data.fd)
{
clientfds[client_i] = -1;
printf("客户端(%d)退出了.\n", client_i);
break;
}
}
}
}
}
}
}
close(epollfd);
return 0;
}
- 客户端(仍然采用select)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
int main(int argc, const char * argv[])
{
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(11332);
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
bzero(&(server_addr.sin_zero), 8);
int server_sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if(server_sock_fd == -1)
{
perror("socket error");
return 1;
}
char recv_msg[BUFFER_SIZE];
char input_msg[BUFFER_SIZE];
if(connect(server_sock_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in)) == 0)
{
fd_set client_fd_set;
struct timeval tv;
while(1)
{
tv.tv_sec = 20;
tv.tv_usec = 0;
FD_ZERO(&client_fd_set);
FD_SET(STDIN_FILENO, &client_fd_set);
FD_SET(server_sock_fd, &client_fd_set);
select(server_sock_fd + 1, &client_fd_set, NULL, NULL, &tv);
if(FD_ISSET(STDIN_FILENO, &client_fd_set))
{
bzero(input_msg, BUFFER_SIZE);
fgets(input_msg, BUFFER_SIZE, stdin);
if(send(server_sock_fd, input_msg, BUFFER_SIZE, 0) == -1)
{
perror("发送消息出错!\n");
}
}
if(FD_ISSET(server_sock_fd, &client_fd_set))
{
bzero(recv_msg, BUFFER_SIZE);
long byte_num = recv(server_sock_fd, recv_msg, BUFFER_SIZE, 0);
if(byte_num > 0)
{
if(byte_num > BUFFER_SIZE)
{
byte_num = BUFFER_SIZE;
}
recv_msg[byte_num] = '\0';
printf("服务器:%s\n", recv_msg);
}
else if(byte_num < 0)
{
printf("接受消息出错!\n");
}
else
{
printf("服务器端退出!\n");
exit(0);
}
}
}
//}
}
return 0;
}
参考
http://www.cnblogs.com/Anker/archive/2013/08/17/3263780.html