免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 2712 | 回复: 6
打印 上一主题 下一主题

[网络] 请教关于epoll+线程池的问题 [复制链接]

论坛徽章:
2
2015年亚洲杯之乌兹别克斯坦
日期:2015-04-25 17:31:122015亚冠之山东鲁能
日期:2015-08-04 14:01:11
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2015-04-25 18:35 |只看该作者 |倒序浏览
本帖最后由 yaoyefengyun 于 2015-04-25 18:41 编辑

学习了lvyilong316前辈的 《彻底学会使用epoll》系列,我现在想做成多线程,但又不是一个线程监听事件,其它几个线程负责事务处理那种。用一个连接测试,没测出问题,但是模拟1000个客户端时,服务端老有Bad file descriptor错误。之前服务端采用单个线程时,测试程序也没测出问题。大家帮我看看是代码写错了呢?还是说连思路都不正确。

下面是服务端代码:
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. #include <sys/types.h>
  6. #include <netinet/in.h>
  7. #include <sys/epoll.h>
  8. #include <fcntl.h>
  9. #include <errno.h>
  10. #include <pthread.h>

  11. #define MAX_EVENTS 2000
  12. #define SERV_PORT 9127
  13. #define MAXLINE 1024

  14. #define handle_error(msg) \
  15.         do { perror(msg); exit(EXIT_FAILURE); } while (0)

  16. int epollfd, listenfd;
  17. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

  18. void setnonblocking(int fd)
  19. {
  20.         int flags, ret;

  21.         flags = fcntl(fd, F_GETFL, 0);
  22.         if (flags == -1)
  23.                 handle_error("fcntl");

  24.         ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
  25.         if (ret == -1)
  26.                 handle_error("fcntl");
  27. }

  28. int create_and_bind(void)
  29. {
  30.         int listenfd, err;
  31.         struct sockaddr_in servaddr;

  32.         listenfd = socket(AF_INET, SOCK_STREAM, 0);
  33.         if (listenfd == -1)
  34.                 handle_error("socket");

  35.         bzero(&servaddr, sizeof(servaddr));
  36.         servaddr.sin_family = AF_INET;
  37.         servaddr.sin_port = htons(SERV_PORT);
  38.         servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

  39.         err = bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
  40.         if (err == -1)
  41.                 handle_error("bind");

  42.         return listenfd;
  43. }

  44. void *thr_func(void *arg)
  45. {
  46.         int i, nfds, connfd, ret;
  47.         socklen_t clilen;
  48.         ssize_t n, nread, nwrite, data_size;
  49.         char buf[MAXLINE];
  50.         struct sockaddr_in cliaddr;
  51.         struct epoll_event ev, events[MAX_EVENTS];

  52.         for ( ; ; ) {
  53.                 nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
  54.                 if (nfds == -1)
  55.                         handle_error("epoll_wait");

  56.                 for (i = 0; i < nfds; ++i) {
  57.                         if (events[i].data.fd == listenfd) {
  58.                                 for ( ; ; ) {
  59.                                         clilen = sizeof(cliaddr);
  60.                                         connfd = accept(events[i].data.fd,
  61.                                                 (struct sockaddr *)&cliaddr,
  62.                                                 &clilen);
  63.                                         if (connfd == -1) {
  64.                                                 if (errno == EAGAIN)
  65.                                                         break;
  66.                                                 else if (errno == ECONNABORTED||
  67.                                                         errno == EPROTO ||
  68.                                                         errno == EINTR)
  69.                                                         continue;
  70.                                                 else
  71.                                                         handle_error("accept");
  72.                                         }

  73.                                         if (connfd >= MAX_EVENTS) {
  74.                                                 ret = close(connfd);
  75.                                                 if (ret == -1)
  76.                                                         handle_error("close");
  77.                                                 continue;
  78.                                         }

  79.                                         setnonblocking(connfd);

  80.                                         ev.data.fd = connfd;
  81.                                         ev.events = EPOLLIN | EPOLLET;
  82.                                         ret = epoll_ctl(epollfd,
  83.                                                 EPOLL_CTL_ADD, connfd, &ev);
  84.                                         if (ret == -1)
  85.                                                 handle_error("epoll_ctl");
  86.                                 }
  87.                                 continue;
  88.                         }
  89.                         if (events[i].events & EPOLLIN) {
  90.                                 n = 0;
  91.                                 while ((nread = read(events[i].data.fd, buf + n,
  92.                                                         MAXLINE)) > 0)
  93.                                         n += nread;
  94.                                 if (nread == 0) {
  95.                                         ret = close(events[i].data.fd);
  96.                                         if (ret == -1)
  97.                                                 handle_error("close");
  98.                                         continue;
  99.                                 } else if (nread == -1 && errno != EAGAIN) {
  100.                                         perror("read");
  101.                                         continue;
  102.                                 }

  103.                                 ev.data.fd = events[i].data.fd;
  104.                                 ev.events = EPOLLOUT | EPOLLET;
  105.                                 ret = epoll_ctl(epollfd, EPOLL_CTL_MOD,
  106.                                         events[i].data.fd, &ev);
  107.                                 if (ret == -1)
  108.                                         handle_error("epoll_ctl");
  109.                         }
  110.                         if (events[i].events & EPOLLOUT) {
  111.                                 snprintf(buf, sizeof(buf),
  112.                                         "HTTP/1.1 200 OK/r/n Content-Length: "
  113.                                         "%d/r/n/r/nHello World",
  114.                                         11);
  115.                                 data_size = strlen(buf);
  116.                                 n = data_size;
  117.                                 while (n > 0) {
  118.                                         nwrite = write(events[i].data.fd,
  119.                                                 buf + data_size - n , n);
  120.                                         if (nwrite == -1 &&
  121.                                                 errno != EAGAIN)
  122.                                                 handle_error("write");
  123.                                         n -= nwrite;
  124.                                 }

  125.                                 ev.data.fd = events[i].data.fd;
  126.                                 ev.events = EPOLLIN | EPOLLET;
  127.                                 ret = epoll_ctl(epollfd, EPOLL_CTL_MOD,
  128.                                         events[i].data.fd, &ev);
  129.                                 if (ret == -1)
  130.                                         handle_error("epoll_ctl");
  131.                         }
  132.                 }
  133.         }
  134. }

  135. int main(void)
  136. {
  137.         int i, ret;
  138.         struct epoll_event ev;
  139.         pthread_t tid[4];

  140.         listenfd = create_and_bind();

  141.         setnonblocking(listenfd);

  142.         ret = listen(listenfd, MAX_EVENTS);
  143.         if (ret == -1)
  144.                 handle_error("listen");

  145.         epollfd = epoll_create1(0);
  146.         if (epollfd == -1)
  147.                 handle_error("epoll_create");

  148.         ev.events = EPOLLIN | EPOLLET;
  149.         ev.data.fd = listenfd;
  150.         ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &ev);
  151.         if (ret == -1)
  152.                 handle_error("epoll_ctl");

  153.         for (i = 0; i < 4; i++) {
  154.                 ret = pthread_create(&tid[i], 0, thr_func, 0);
  155.                 if (ret != 0)
  156.                         handle_error("pthread_crete");
  157.         }

  158.         for (i = 0; i < 4; i++) {
  159.                 ret = pthread_join(tid[i], 0);
  160.                 if (ret != 0)
  161.                         handle_error("pthread_jion");
  162.         }

  163.         exit(EXIT_SUCCESS);
  164. }
复制代码
下面是客户端测试代码:
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <errno.h>
  5. #include <unistd.h>
  6. #include <sys/types.h>
  7. #include <sys/socket.h>
  8. #include <netinet/in.h>
  9. #include <arpa/inet.h>
  10. #include <pthread.h>
  11. #include <signal.h>

  12. #define MAXLINE 8192
  13. #define LISTENQ 1024
  14. #define SERV_PORT 9127

  15. #define handle_error(msg) \
  16.         do { perror(msg); exit(EXIT_FAILURE); } while (0)

  17. #define err_quit(msg) \
  18.         do { fprintf(stderr, msg); exit(EXIT_FAILURE); } while (0)

  19. void str_cli(FILE *fp, int sockfd)
  20. {
  21.         pid_t pid;
  22.         int n, ret;
  23.         char sendline[MAXLINE + 1], recvline[MAXLINE + 1];

  24.         pid = fork();
  25.         switch (pid) {
  26.         case -1:
  27.                 handle_error("fork");
  28.                 break;
  29.         case 0:
  30.                 while ((n = read(sockfd, recvline, MAXLINE)) > 0) {
  31.                         recvline[n] = 0;
  32.                         fprintf(stderr, recvline);
  33.                 }

  34.                 kill(getppid(), SIGTERM);
  35.                 exit(0);
  36.                 break;
  37.         default:
  38.                 while (fgets(sendline, MAXLINE, fp))
  39.                         write(sockfd, sendline, strlen(sendline));
  40.                 break;
  41.         }

  42.         ret = shutdown(sockfd, SHUT_WR);
  43.         if (ret == -1)
  44.                 handle_error("shutdown");
  45. }

  46. void *thr_func(void *arg)
  47. {
  48.         int sockfd, ret;
  49.         ssize_t s;
  50.         struct sockaddr_in servaddr;
  51.         char buf[MAXLINE];

  52.         sockfd = socket(AF_INET, SOCK_STREAM, 0);
  53.         if (sockfd == -1)
  54.                 handle_error("socket");

  55.         bzero(&servaddr, sizeof(servaddr));
  56.         servaddr.sin_family = AF_INET;
  57.         servaddr.sin_port = htons(SERV_PORT);

  58.         ret = inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
  59.         if (ret == -1)
  60.                 handle_error("inet_pton");

  61.         ret = connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
  62.         if (ret == -1)
  63.                 handle_error("connect");

  64.         s = write(sockfd, "echo", 4);
  65.         if (s == -1)
  66.                 fprintf(stderr, "%ld, write %s\n",
  67.                         (long)pthread_self(), strerror(errno));
  68.         ret = shutdown(sockfd, SHUT_WR);
  69.         if (ret == -1)
  70.                 handle_error("shutdown");
  71.         s = read(sockfd, buf, MAXLINE);
  72.         if (s == -1)
  73.                 fprintf(stderr, "read %s\n", strerror(errno));

  74.         return NULL;
  75. }

  76. int main(void)
  77. {
  78.         int i, ret;
  79.         pthread_t tid[1000];

  80.         for (i = 0; i < 1000; i++) {
  81.                 ret = pthread_create(&tid[i], 0, thr_func, 0);
  82.                 if (ret != 0)
  83.                         handle_error("pthread_create");
  84.         }

  85.         for (i = 0; i < 1000; i++) {
  86.                 ret = pthread_join(tid[i], NULL);
  87.                 if (ret != 0)
  88.                         handle_error("pthread_join");
  89.         }
  90.         exit(EXIT_SUCCESS);
  91. }

复制代码

论坛徽章:
2
2015年亚洲杯之乌兹别克斯坦
日期:2015-04-15 15:43:482015亚冠之迪拜阿赫利
日期:2015-06-30 20:36:46
2 [报告]
发表于 2015-04-29 18:38 |只看该作者
为何要在4个线程中对同一个fd做epoll_wait呢?

论坛徽章:
2
2015年亚洲杯之乌兹别克斯坦
日期:2015-04-25 17:31:122015亚冠之山东鲁能
日期:2015-08-04 14:01:11
3 [报告]
发表于 2015-04-30 11:16 |只看该作者
本帖最后由 yaoyefengyun 于 2015-04-30 11:23 编辑

因为如果只有一个线程,当线程执行epoll_wait以外的操作时,有可能会有新的I/O事件到来。四个线程的话,当一两个线程处理其它工作的时候,其它线程依然阻塞在epoll_wait,能够及时处理新到来的I/O事件。
可能我的想法不太对,处理比较复杂的业务时,或许应该一个线程接收I/O事件,接收到的事件放到一个队列,然后让线程池中的线程来处理这个队列的事件?回复 2# bfdhczw


   

论坛徽章:
2
2015年亚洲杯之乌兹别克斯坦
日期:2015-04-15 15:43:482015亚冠之迪拜阿赫利
日期:2015-06-30 20:36:46
4 [报告]
发表于 2015-04-30 11:59 |只看该作者
回复 3# yaoyefengyun

或许应该一个线程接收I/O事件,接收到的事件放到一个队列,然后让线程池中的线程来处理这个队列的事件?

我也倾向于这种做法。
   

论坛徽章:
2
2015年亚洲杯之乌兹别克斯坦
日期:2015-04-25 17:31:122015亚冠之山东鲁能
日期:2015-08-04 14:01:11
5 [报告]
发表于 2015-04-30 13:12 |只看该作者
本帖最后由 yaoyefengyun 于 2015-04-30 13:13 编辑

再请教一个问题,这种非阻塞的情况下,不完整write应该怎么处理呢,轮询直到write完吗,还是说重新epoll_mod OUT事件呢?感觉如果轮询,有点浪费cpu资源,重新epoll_mod OUT的话,似乎又和期望的高效率相违背,有点纠结。回复 4# bfdhczw


   

论坛徽章:
2
2015年亚洲杯之乌兹别克斯坦
日期:2015-04-15 15:43:482015亚冠之迪拜阿赫利
日期:2015-06-30 20:36:46
6 [报告]
发表于 2015-04-30 13:34 |只看该作者
回复 5# yaoyefengyun
其实我也是初学者,感觉这种情况等待下一次事件应该好些吧。

   

论坛徽章:
2
2015年亚洲杯之乌兹别克斯坦
日期:2015-04-25 17:31:122015亚冠之山东鲁能
日期:2015-08-04 14:01:11
7 [报告]
发表于 2015-04-30 13:38 |只看该作者
本帖最后由 yaoyefengyun 于 2015-04-30 17:25 编辑

嗯,谢谢!回复 6# bfdhczw


   
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP