免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 3064 | 回复: 1
打印 上一主题 下一主题

[C++] 初级epoll问题,希望高手花几分钟解答一下困扰新手几周的问题 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2013-03-25 20:49 |只看该作者 |倒序浏览
本帖最后由 xtchina 于 2013-03-26 11:18 编辑

问题已解决,自己粗心造成

1.为什么用EPOLL_CTL_ADD加入到epoll中的socket会无声无息的消失了?
2.为什么用EPOLL_CTL_DEL删除从epoll_wait中返回的句柄会出现bad file descriptor错误?
3.为什么已经加入到epoll中的sock连接中断,却没有从epoll中得到任何提示?
4.为什么我的socket句柄还没有调用close,系统就把它分配给别的连接用了?

撇开效率不谈,我的代码**问题?网上找了好几周了,实在解决不了,还望高手能够帮忙看看,不胜感激!

  1. #include <sys/types.h>
  2. #include <sys/mman.h>
  3. #include <sys/resource.h>
  4. #include <stdio.h>
  5. #include <errno.h>
  6. #include <unistd.h>
  7. #include <string.h>
  8. #include <fcntl.h>
  9. #include <unistd.h>
  10. #include <stdlib.h>
  11. #include <assert.h>

  12. #include <pthread.h>

  13. #include <netinet/tcp.h>
  14. #include <sys/epoll.h>
  15. #include <sys/socket.h>
  16. #include <netinet/in.h>
  17. #include <arpa/inet.h>

  18. bool                        woking                                        = true;

  19. unsigned short        server_port                                = 9001;
  20. const char*                server_ip                                = "127.0.0.1";

  21. int                                wait_epevent_num                = 4096;
  22. int                                ep_client                                = -1;
  23. int                                ep_server                                = -1;
  24. int                                buffer_size                                = 4096;

  25. double                        server_recv_size                = 0.0d;
  26. double                        server_send_size                = 0.0d;

  27. double                        client_recv_size                = 0.0d;
  28. double                        client_send_size                = 0.0d;

  29. unsigned int        server_inepoll_sock                = 0;
  30. unsigned int        server_outepoll_sock        = 0;

  31. unsigned int        client_inepoll_sock                = 0;
  32. unsigned int        client_outepoll_sock        = 0;

  33. struct epitem
  34. {
  35.         int                                sock;
  36.         unsigned short        local_port;
  37.         unsigned short        remote_port;
  38.         unsigned int        send_data_size;
  39.         unsigned int        recv_data_size;
  40. };

  41. // 获取本端端口
  42. unsigned short get_sock_local_port(int sock)
  43. {
  44.         unsigned short local_port = 0;
  45.         sockaddr_in addr;
  46.         socklen_t nl = sizeof(addr);
  47.         if (0 == getsockname(sock, (sockaddr*)&addr, &nl))
  48.         {
  49.                 local_port = ntohs(addr.sin_port);
  50.         }

  51.         return local_port;
  52. }
  53. // 获取远端端口
  54. unsigned short get_sock_remote_port(int sock)
  55. {
  56.         unsigned short remote_port = 0;
  57.         sockaddr_in addr;
  58.         socklen_t nl = sizeof(addr);
  59.         if (0 == getpeername(sock, (sockaddr*)&addr, &nl))
  60.         {
  61.                 remote_port = ntohs(addr.sin_port);
  62.         }

  63.         return remote_port;
  64. }

  65. // 投入到epoll中
  66. bool putinto_epoll(int epoll, int sock)
  67. {
  68.         if (0 != fcntl(sock, F_SETFL
  69.                 , fcntl(sock, F_GETFL, 0)|O_NONBLOCK))
  70.         {
  71.                 return false;
  72.         }

  73.         epitem* newitem = new epitem;
  74.         if (0 == newitem)
  75.         {
  76.                 return false;
  77.         }

  78.         newitem->sock = sock;
  79.         newitem->local_port = get_sock_local_port(sock);
  80.         newitem->remote_port = get_sock_remote_port(sock);
  81.         newitem->send_data_size = 0;
  82.         newitem->recv_data_size = 0;

  83.         epoll_event eevent;
  84.         eevent.data.ptr = newitem;
  85.         eevent.events = EPOLLIN | EPOLLOUT;

  86.         if (0 != epoll_ctl(epoll, EPOLL_CTL_ADD, sock, &eevent))
  87.         {       
  88.                 printf("epoll_ctl call failed [%d:%s]...\r\n", errno, strerror(errno));
  89.                 delete newitem;
  90.                 return false;
  91.         }       

  92.         // 33%的概率关掉一些刚刚投入到epoll中的sock,仅仅是测试需要
  93.         if (rand() % 3 == 0)
  94.         {
  95.                 close(sock);
  96.         }

  97.         return true;
  98. }

  99. // 连接线程,不停地进行连接
  100. void* thread_connect(void* param)
  101. {
  102.         printf("begin thread connect...\r\n");

  103.         sockaddr_in server_addr;
  104.         memset(&server_addr, 0, sizeof(sockaddr_in));
  105.         server_addr.sin_family                = AF_INET;
  106.         server_addr.sin_addr.s_addr = inet_addr(server_ip);
  107.         server_addr.sin_port                = htons(server_port);

  108.         while(woking)
  109.         {
  110.                 int new_client_sock = -1;               

  111.                 new_client_sock = socket(PF_INET, SOCK_STREAM, 0);
  112.                 if (-1        == new_client_sock)
  113.                 {
  114.                         sleep(1);
  115.                         continue;
  116.                 }

  117.                 if (0 != connect(new_client_sock, (sockaddr*)&server_addr
  118.                         , sizeof(sockaddr_in)))
  119.                 {
  120.                         goto fail;
  121.                 }

  122.                 // 放入epoll队列中
  123.                 if (!putinto_epoll(ep_client, new_client_sock))
  124.                 {
  125.                         goto fail;
  126.                 }

  127.                 client_inepoll_sock++;
  128.                 goto ok;

  129. fail:
  130.                 close(new_client_sock);
  131.                 //sleep(1);
  132. ok:
  133.                 ;
  134.         }
  135.         printf("end thread connect...\r\n");
  136.         return 0;
  137. }
  138. // 接收线程,不停地接收客户端的连接
  139. void* thread_accept(void* param)
  140. {
  141.         printf("begin thread accept...\r\n");

  142.         int listen_sock;
  143.         sockaddr_in server_addr;       

  144.         listen_sock = socket(PF_INET, SOCK_STREAM, 0);
  145.         if (-1 == listen_sock)
  146.         {
  147.                 printf("end thread accept[%d:%s]...\r\n", errno, strerror(errno));
  148.                 return 0;
  149.         }

  150.         int reuseaddr = 1;
  151.         if (0 != setsockopt(listen_sock, SOL_SOCKET
  152.                 , SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr) ))
  153.         {
  154.                 printf("end thread accept[%d:%s]...\r\n", errno, strerror(errno));
  155.                 return 0;
  156.         }

  157.         memset(&server_addr, 0, sizeof(sockaddr_in));
  158.         server_addr.sin_family = AF_INET;
  159.         server_addr.sin_addr.s_addr = inet_addr(server_ip);
  160.         server_addr.sin_port = htons(server_port);

  161.         if (0 != bind(listen_sock, (sockaddr*)&server_addr, sizeof(server_addr)))
  162.         {
  163.                 printf("end thread accept[%d:%s]...\r\n", errno, strerror(errno));
  164.                 return 0;
  165.         }

  166.         if (0 != fcntl(listen_sock, F_SETFL
  167.                 , fcntl(listen_sock, F_GETFL, 0) & ~O_NONBLOCK))
  168.         {
  169.                 printf("end thread accept[%d:%s]...\r\n", errno, strerror(errno));
  170.                 return 0;
  171.         }

  172.         if (0 != listen(listen_sock, 32))
  173.         {
  174.                 printf("end thread accept[%d:%s]...\r\n", errno, strerror(errno));
  175.                 return 0;
  176.         }

  177.         while(woking)
  178.         {
  179.                 int new_accept_sock = -1;
  180.                 new_accept_sock = accept(listen_sock, 0, 0);
  181.                 if (-1 == new_accept_sock)
  182.                 {
  183.                         goto fail;
  184.                 }

  185.                 // 放入epoll队列中
  186.                 if (!putinto_epoll(ep_server, new_accept_sock))
  187.                 {
  188.                         goto fail;
  189.                 }

  190.                 server_inepoll_sock++;
  191.                 goto ok;

  192. fail:
  193.                 close(new_accept_sock);
  194.                 //sleep(1);

  195. ok:
  196.                 ;
  197.         }

  198.         printf("end thread accept...\r\n");
  199.         return 0;
  200. }
  201. // epoll中的输入处理
  202. bool do_input(int sock, char* buffer, int buf_size, double& recv_size)
  203. {
  204.         assert(buf_size > 0);
  205.         assert(buffer != 0);

  206.         for (int i=0; i<8; i++)
  207.         {
  208.                 int recvlen = recv(sock, buffer, buf_size, 0);
  209.                 // return false; 如果这里返回为什么会有double free错误?
  210.                 recv_size += recvlen>0?recvlen:0;
  211.                 if (recvlen == 0)
  212.                 {
  213.                         return false;
  214.                 }
  215.                 else if (recvlen == -1)
  216.                 {
  217.                         if (errno == EAGAIN)
  218.                         {
  219.                                 return true;
  220.                         }
  221.                         if (errno == EINTR)
  222.                         {
  223.                                 continue;
  224.                         }
  225.                        
  226.                         return false;
  227.                 }
  228.                 else if (recvlen == buf_size)
  229.                 {
  230.                         continue;
  231.                 }
  232.                 else if (recvlen < buf_size)
  233.                 {
  234.                         assert(recvlen > 0);
  235.                         return true;
  236.                 }               
  237.         }
  238.         // 有太多的数据时,返回false关闭连接
  239.         return false;
  240. }
  241. // epoll中的输出处理
  242. bool do_output(int sock, char* buffer, int buf_size, double& send_size)
  243. {
  244.         for (int i=0; i<4; i++)
  245.         {
  246.                 int sendlen = send(sock, buffer, buf_size, 0);
  247.                 // return false; 如果这里返回为什么会有double free错误?
  248.                 send_size += sendlen>0?sendlen:0;
  249.                 if (sendlen == -1)
  250.                 {
  251.                         if (errno == EAGAIN)
  252.                         {
  253.                                 return true;
  254.                         }
  255.                         if (errno == EINTR)
  256.                         {
  257.                                 continue;
  258.                         }
  259.                         // 为什么有SIGPIPE信号,出错的连接我都关掉了
  260.                         return false;
  261.                 }
  262.                 else if (sendlen == buf_size)
  263.                 {
  264.                         continue;
  265.                 }
  266.                 else if (sendlen < buf_size)
  267.                 {
  268.                         assert(sendlen > 0);
  269.                         return true;
  270.                 }               
  271.         }

  272.         return true;
  273. }

  274. void do_epoll(int epoll, epoll_event* wait_epevent, int wait_epevent_num
  275.                           , int wait_time, char* buffer, int buf_size
  276.                           , double& recv_size, double& send_size, unsigned int& out_sock_num)
  277. {
  278.         int wait_num = epoll_wait(epoll, wait_epevent, wait_epevent_num, wait_time);

  279.         for (int i=0; i<wait_num; i++)
  280.         {
  281.                 epitem* sockitem = (epitem*)wait_epevent[i].data.ptr;
  282.                 if ( 0 != (wait_epevent[i].events & ~(EPOLLIN|EPOLLOUT)) )
  283.                 {                       
  284.                         // 非输入输出事件,全部关闭
  285.                         goto fail;
  286.                 }

  287.                 if ((wait_epevent[i].events & EPOLLIN)
  288.                         && !do_input(sockitem->sock, buffer, buf_size, recv_size))
  289.                 {
  290.                         // 输入事件处理失败
  291.                         goto fail;
  292.                 }

  293.                 {
  294.                         // 这里sock是有效的,为什么端口号会改变??
  295.                         unsigned short local_port = get_sock_local_port(sockitem->sock);
  296.                         assert((0 == local_port) || (local_port == sockitem->local_port));

  297.                         unsigned short remote_port = get_sock_remote_port(sockitem->sock);
  298.                         assert((0 == remote_port) || (remote_port == sockitem->remote_port));
  299.                 }

  300.                 if ((wait_epevent[i].events & EPOLLOUT)
  301.                         && !do_output(sockitem->sock, buffer, buf_size, send_size))
  302.                 {
  303.                         // 输出事件处理失败
  304.                         goto fail;
  305.                 }
  306.                
  307.                 {
  308.                         // 这里sock是有效的,为什么端口号会改变??
  309.                         unsigned short local_port = get_sock_local_port(sockitem->sock);
  310.                         assert((0 == local_port) || (local_port == sockitem->local_port));

  311.                         unsigned short remote_port = get_sock_remote_port(sockitem->sock);
  312.                         assert((0 == remote_port) || (remote_port == sockitem->remote_port));
  313.                 }

  314.                 // 事件过多时,随机关闭一些连接,仅仅是测试需要
  315.                 if (rand()%5000 < (wait_num-1000))
  316.                 {
  317.                         goto fail;
  318.                 }

  319.                 goto ok;

  320. fail:
  321.                 if (0 != epoll_ctl(epoll, EPOLL_CTL_DEL, sockitem->sock, 0))
  322.                 {
  323.                         // 为什么会出现bad file descriptor?
  324.                         //pirntf("epoll_ctl failed.[%d:%s]\r\n", errno, strerror(errno));
  325.                 }
  326.                 close(sockitem->sock);
  327.                 delete sockitem;
  328.                 out_sock_num++;
  329. ok:
  330.                 ;
  331.         }
  332. }
  333. // 服务器端epoll数据处理
  334. void* thread_server(void* param)
  335. {
  336.         printf("begin thread server...\r\n");

  337.         // 创建接收和发送数据的缓冲区,所有sock公用,数据随机
  338.         char*                buffer        = new char[buffer_size];
  339.         assert(0 != buffer);

  340.         // epoll中等待事件
  341.         epoll_event* wait_epevent = new epoll_event[wait_epevent_num];
  342.         assert (wait_epevent != 0);

  343.         while(woking)
  344.         {
  345.                 do_epoll(ep_server, wait_epevent, wait_epevent_num, 1, buffer, buffer_size
  346.                         , server_recv_size, server_send_size, server_outepoll_sock);
  347.         }

  348.         printf("end thread server...\r\n");
  349.         return 0;
  350. }
  351. // 客户端epoll数据处理
  352. void* thread_client(void* param)
  353. {
  354.         printf("begin thread client...\r\n");

  355.         // 创建接收和发送数据的缓冲区,所有sock公用,数据随机
  356.         char*                buffer        = new char[buffer_size];
  357.         assert(0 != buffer);

  358.         // epoll中等待事件
  359.         epoll_event* wait_epevent = new epoll_event[wait_epevent_num];
  360.         assert (wait_epevent != 0);

  361.         while(woking)
  362.         {
  363.                 do_epoll(ep_client, wait_epevent, wait_epevent_num, 1, buffer, buffer_size
  364.                         , client_recv_size, client_send_size, client_outepoll_sock);
  365.         }

  366.         delete[] buffer;
  367.         delete[] wait_epevent;

  368.         printf("end thread client...\r\n");
  369.         return 0;
  370. }


  371. int main()
  372. {
  373.         rlimit rlim;
  374.         if (0 == getrlimit(RLIMIT_NOFILE, &rlim))
  375.         {
  376.                 rlim.rlim_cur = rlim.rlim_max;
  377.                 setrlimit(RLIMIT_NOFILE , &rlim);
  378.         }

  379.         // 创建服务器端epoll
  380.         ep_server =  epoll_create(4096);
  381.         assert(ep_server != -1);

  382.         // 创建客户端端epoll
  383.         ep_client =  epoll_create(4096);
  384.         assert (ep_client != -1);

  385.         pthread_t thread_id_connect;
  386.         pthread_t thread_id_accept;
  387.         pthread_t thread_id_client;
  388.         pthread_t thread_id_server;

  389.         woking = true;

  390.         // 服务器处理线程
  391.         if (0 != pthread_create(&thread_id_server, 0, thread_server, 0))
  392.         {
  393.                 return 0;
  394.         }

  395.         // 客户端处理线程
  396.         if (0 != pthread_create(&thread_id_client, 0, thread_client, 0))
  397.         {
  398.                 return 0;
  399.         }

  400.         // 连接监听线程
  401.         if (0 != pthread_create(&thread_id_accept, 0, thread_accept, 0))
  402.         {
  403.                 return 0;
  404.         }
  405.         // 建立连接线程
  406.         if (0 != pthread_create(&thread_id_connect, 0, thread_connect, 0))
  407.         {
  408.                 return 0;
  409.         }

  410.         printf("begin epoll test...\r\n");

  411.         while (1)
  412.         {
  413.                 // 打印一些统计信息,连接数和收发数据速度
  414.                 // 连接数为什么不对?
  415.                 printf("s[%d:%f:%f] c[%d:%f:%f]\r\n"
  416.                         ,(server_inepoll_sock-server_outepoll_sock)
  417.                         , server_recv_size/1024/1024,  server_send_size/1024/1024
  418.                         ,(client_inepoll_sock-client_outepoll_sock)
  419.                         , client_recv_size/1024/1024, client_send_size/1024/1024);

  420.                 server_recv_size = 0.0d;
  421.                 server_send_size = 0.0d;
  422.                 client_recv_size = 0.0d;
  423.                 client_send_size = 0.0d;

  424.                 sleep(1);
  425.         }

  426.         sleep(-1);

  427.         woking = false;

  428.         close(ep_server);
  429.         close(ep_client);

  430.         pthread_join(thread_id_connect, 0);
  431.         pthread_join(thread_id_accept, 0);
  432.         pthread_join(thread_id_client, 0);
  433.         pthread_join(thread_id_server, 0);

  434.         printf("end epoll test...\r\n");

  435.         return 0;
  436. }
复制代码

论坛徽章:
0
2 [报告]
发表于 2013-12-09 09:57 |只看该作者
对你这个线程设置下 打开的句柄个数限制
setrlimit(RLIMIT_NOFILE, & rl)
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP