免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 2779 | 回复: 4
打印 上一主题 下一主题

[网络] epoll+事务处理线程池 纠结的问题,服务端会挂掉! [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2015-04-07 10:32 |只看该作者 |倒序浏览
本帖最后由 guojinshuai 于 2015-04-07 10:34 编辑

服务端代码如下:

  1. #include  <unistd.h>
  2. #include  <sys/types.h>  
  3. #include  <sys/socket.h>
  4. #include  <netinet/in.h>
  5. #include  <arpa/inet.h>  
  6. #include  <stdlib.h>
  7. #include  <errno.h>
  8. #include  <stdio.h>
  9. #include  <string.h>
  10. #include  <fcntl.h>
  11. #include  <sys/select.h>
  12. #include  <pthread.h>
  13. #include  <iostream>
  14. #include <sys/epoll.h>
  15. #include  <list>
  16. #include <signal.h>
  17. using namespace std;


  18. pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;

  19. enum falgrw{readflag = 0,writeflag};

  20. typedef struct{
  21.         int fd;
  22.         int op;
  23. }Task;

  24. void *func( void *arg );
  25. struct epoll_event ev,events[200];
  26. int epfd=0;
  27. list<Task> tasklist(0);
  28. int  main()
  29. {
  30.        
  31.         int guo=0;
  32.         pthread_t thread[3];
  33.         for(guo=0;guo<3;guo++)
  34.         {
  35.                 pthread_create(&thread[guo],NULL,func,NULL);
  36.         }

  37.         int  listenfd = 0;
  38.         int    connfd = 0;
  39.         int serverPort= 8888;
  40.         int   listenq = 5;
  41.         socklen_t socklen = 0;
  42.         int opt = 1;
  43.         int nready = 0;       
  44.         int maxfd  = 0;       
  45.         int maxi = -1;

  46.         char buf[50]= {0};
  47.         struct sockaddr_in cliaddr, servaddr;
  48.         socklen = sizeof(struct sockaddr_in);

  49.         memset(&servaddr, 0, sizeof(servaddr));
  50.         memset(&cliaddr,  0, sizeof(cliaddr));
  51.        
  52.         servaddr.sin_family = AF_INET;
  53.         servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  54.         servaddr.sin_port = htons(serverPort);

  55.         listenfd = socket(AF_INET, SOCK_STREAM, 0);
  56.         if (listenfd < 0)
  57.         {
  58.                 perror("socket error");
  59.                 return -1;
  60.         }
  61.         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt)) < 0)
  62.         {
  63.                 perror("setsockopt error");   
  64.         }
  65.         if (bind(listenfd, (struct sockaddr *) &servaddr, socklen) < 0)
  66.         {
  67.                 perror("bind error");
  68.                 return -1;
  69.         }
  70.         if (listen(listenfd, listenq) < 0)
  71.         {
  72.                 perror("listen error");   
  73.                 return -1;
  74.         }
  75.        
  76.         //epoll LT
  77.         epfd=epoll_create(256);
  78.         ev.data.fd=listenfd;
  79.         //设置要处理的事件类型
  80.         ev.events=EPOLLIN;
  81.         //注册epoll事件
  82.         epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
  83.        
  84.         int nfds=0;
  85.        
  86.        
  87.         printf("echo server startup,listen on port:%d\n", serverPort);
  88.         while(1)  
  89.         {
  90.                
  91.                 struct timeval tv;
  92.                 tv.tv_sec = 1;
  93.                 tv.tv_usec = 0;

  94.                 nfds=epoll_wait(epfd,events,200,50);
  95.                 if(nfds==0)
  96.                 {
  97.                         printf("0 00 0 0 0 0 0 0\n");
  98.                 }
  99.                 else if(nfds<0)
  100.                 {
  101.                         printf("<<<<<<<0 00 0 0 0 0 0 0\n");
  102.                 }
  103.                 for(int i=0;i<nfds;++i)
  104.                 {
  105.                         if(events[i].data.fd==listenfd)
  106.                         {                 
  107.                                 connfd = accept(listenfd,(sockaddr *)&cliaddr, &socklen);
  108.                                 if(connfd<0){
  109.                                         perror("connfd<0");
  110.                                         exit(1);
  111.                                 }
  112.                                 //设置用于读操作的文件描述符
  113.                                 ev.data.fd=connfd;
  114.                                 //设置用于注测的读操作事件
  115.                                 ev.events=EPOLLIN|EPOLLONESHOT;
  116.                                 //注册ev
  117.                                 epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
  118.                         }
  119.                         else if(events[i].events&EPOLLIN)
  120.                         {
  121.                                 Task task={0,0};
  122.                                 task.fd=events[i].data.fd;
  123.                                 task.op= readflag;
  124.                                 pthread_mutex_lock(&lock);
  125.                                 tasklist.push_front(task);
  126.                                 pthread_mutex_unlock(&lock);

  127.                         }
  128.                         else if(events[i].events&EPOLLOUT)
  129.                         {
  130.                                 //
  131.                         }                        
  132.                 }
  133.         }
  134.         close(listenfd);
  135.         return 0;
  136. }

  137. void *func( void *arg )
  138. {
  139.         char buffer[1024]={0};
  140.         size_t size = 0;
  141.         pthread_detach(pthread_self());
  142.         Task task={0,0};
  143.         while(1)
  144.         {
  145.                 pthread_mutex_lock(&lock);
  146.                 size = tasklist.size();
  147.                 pthread_mutex_unlock(&lock);
  148.                 memset(buffer,0,1024);
  149.                 memset(&task,0,sizeof(Task));
  150.                 printf("size:%d\n",size);
  151.                 if(size>0)
  152.                 {
  153.                         pthread_mutex_lock(&lock);
  154.                         task = tasklist.front();
  155.                         tasklist.pop_front();
  156.                         pthread_mutex_unlock(&lock);       
  157.                         int xxx=read(task.fd,buffer,sizeof(buffer));
  158.                         if(xxx==0)
  159.                         {
  160.                                 printf("client %d close!\n",task.fd);
  161.                                 close(task.fd);
  162.                                 epoll_ctl(epfd,EPOLL_CTL_DEL,task.fd,&ev);
  163.                         }
  164.                         else if(xxx>0)
  165.                         {
  166.                                 ev.data.fd=task.fd;
  167.                                 //设置用于注测的读操作事件
  168.                                 ev.events=EPOLLIN|EPOLLONESHOT;
  169.                                 //注册ev
  170.                                 epoll_ctl(epfd,EPOLL_CTL_MOD,task.fd,&ev);
  171.                         }
  172.                         else
  173.                         {
  174.                                 printf("wrong!!\n");
  175.                                 close(task.fd);
  176.                                 epoll_ctl(epfd,EPOLL_CTL_DEL,task.fd,&ev);
  177.                         }
  178.                        
  179.                 }
  180.                 else
  181.                 {
  182.                         sleep(1);
  183.                 }
  184.                
  185.         }

  186.         return 0;
  187. }

复制代码
客户端代码如下:

  1. #include  <unistd.h>
  2. #include  <sys/types.h>   
  3. #include  <sys/socket.h>   
  4. #include  <netinet/in.h>   
  5. #include  <arpa/inet.h>   
  6. #include  <stdlib.h>
  7. #include  <errno.h>
  8. #include  <stdio.h>
  9. #include  <string.h>
  10. #include  <fcntl.h>
  11. #include  <sys/select.h>
  12. #include  <pthread.h>
  13. #include  <iostream>
  14. #include  <list>
  15. using namespace std;

  16. int main()
  17. {
  18.         int cfd;
  19.         int recbytes;
  20.         int sin_size;
  21.         char buffer[1024]={0};   
  22.         struct sockaddr_in s_add,c_add;
  23.         unsigned short portnum=8888;  

  24.        
  25.         printf("Hello,welcome to client !\r\n");

  26.         cfd = socket(AF_INET, SOCK_STREAM, 0);
  27.         if(-1 == cfd)
  28.         {
  29.                 printf("socket fail ! \r\n");
  30.                 return -1;
  31.         }
  32.         printf("socket ok !\r\n");
  33.        
  34.         bzero(&s_add,sizeof(struct sockaddr_in));
  35.         s_add.sin_family=AF_INET;
  36.         s_add.sin_addr.s_addr= inet_addr("127.0.0.1");
  37.         s_add.sin_port=htons(portnum);

  38.         if(-1 == connect(cfd,(struct sockaddr *)(&s_add), sizeof(struct sockaddr)))
  39.         {
  40.                 printf("connect fail !\r\n");
  41.                 return -1;
  42.         }
  43.         printf("connect ok !\r\n");

  44.         int j=0;
  45.         struct timeval tv;  
  46.         while(1)
  47.         {
  48.                
  49.                 memset(buffer,0,sizeof(buffer));
  50.                 sprintf(buffer,"%d%d",j,j);
  51.                 if(-1 == (recbytes = write(cfd,buffer,strlen(buffer)+1)))
  52.                 {
  53.                         printf("write data fail !\r\n");
  54.                         return -1;
  55.                 }
  56.                 printf("write ok.buffer=[%s]\n",buffer);
  57.                 sleep(1);
  58.                 j++;
  59.                
  60.          //test select   
  61.          tv.tv_sec = 0;  
  62.          tv.tv_usec = 1000;  
  63.                  int ret = select(0, NULL, NULL, NULL, &tv);  
  64.          if (-1 == ret)  
  65.          {  
  66.             printf("wweeeeeee\n");
  67.          }
  68.        
  69.         }
  70.         close(cfd);
  71.         return 0;
  72. }
复制代码
编译完客户端后,用一个脚本,nohup ./xxx & ,模拟700个客户端同时执行,运行几分钟后服务端就挂了!!!
也看了core文件,输出如下:
(gdb) bt
#0  0x00002ac36656af45 in raise () from /lib64/libc.so.6
#1  0x00002ac36656c340 in abort () from /lib64/libc.so.6
#2  0x00002ac3665a178b in __libc_message () from /lib64/libc.so.6
#3  0x00002ac3665a677e in malloc_printerr () from /lib64/libc.so.6
#4  0x00002ac3665a7ecc in free () from /lib64/libc.so.6
#5  0x0000000000401ca7 in __gnu_cxx::new_allocator<std::_List_node<Task> >::deallocate(std::_List_node<Task>*, unsigned long) ()
#6  0x0000000000401ccf in std::_List_base<Task, std::allocator<Task> >::_M_put_node(std::_List_node<Task>*) ()
#7  0x0000000000401e7b in std::list<Task, std::allocator<Task> >::_M_erase(std::_List_iterator<Task>) ()
#8  0x0000000000401ea3 in std::list<Task, std::allocator<Task> >::pop_front() ()
#9  0x0000000000401284 in func(void*) ()
#10 0x00002ac365fc7193 in start_thread () from /lib64/libpthread.so.0
#11 0x00002ac3665fc0dd in clone () from /lib64/libc.so.6

我看了半天代码,也修改测试,没看明白线程函数有什么问题。。。请大侠们帮忙看看是啥原因。
PS:看到网上有个方法,如果 MALLOC_CHECK_=0 ./xxx 这样运行的话,服务端就不会挂了.








论坛徽章:
0
2 [报告]
发表于 2015-04-07 16:21 |只看该作者
没有人帮忙测试下吗。。。。一直没找到问题所在。

论坛徽章:
0
3 [报告]
发表于 2015-04-07 17:07 |只看该作者
找到原因了,整体代码没问题,就是lock的地方不对!希望跟我一样学习多线程的童鞋,注意好lock!!!

论坛徽章:
0
4 [报告]
发表于 2015-08-31 19:56 |只看该作者
lock 怎么错了啊?能简单的说一下吗?

论坛徽章:
0
5 [报告]
发表于 2015-08-31 20:01 |只看该作者
uangyy 发表于 2015-08-31 19:56
lock 怎么错了啊?能简单的说一下吗?


是在得到size之后unlock,在用之前就背其他线程给改变了吧?
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP