免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
12下一页
最近访问板块 发新帖
查看: 9669 | 回复: 19

[网络] select+多线程 问题好纠结,愁死了 [复制链接]

论坛徽章:
0
发表于 2015-04-01 20:35 |显示全部楼层
代码是这样写的:当主线程发现有连接来时加入FD_SET,select到可读时,加一个读的任务到队列tasklist中,然后有3个子线程循环着从队列取任务read数据。
但是现在有个问题:只要客户端一发数据,服务端的tasklist.size()立马就上千,导致后续很多问题。
哪位热心朋友帮忙看看代码~~3Q、、

服务端:

  1. #include  <unistd.h>
  2. #include  <sys/types.h>       /* basic system data types */
  3. #include  <sys/socket.h>      /* basic socket definitions */
  4. #include  <netinet/in.h>      /* sockaddr_in{} and other Internet defns */
  5. #include  <arpa/inet.h>       /* inet(3) functions */
  6. #include  <stdlib.h>
  7. #include  <errno.h>
  8. #include  <stdio.h>
  9. #include  <string.h>
  10. #include  <fcntl.h>
  11. #include  <sys/select.h>
  12. #include  <pthread.h>
  13. #include  <iostream>
  14. #include  <list>
  15. #include <signal.h>
  16. using namespace std;


  17. pthread_mutex_t lock;

  18. enum falgrw{readflag = 0,writeflag};

  19. typedef struct{
  20. int fd;
  21. int op;
  22. }Task;

  23. void *func( void *arg );
  24. Task *create_task(int socket);
  25. int clientSockFds[FD_SETSIZE]={0};
  26. fd_set allset;
  27. list<Task> tasklist(0);
  28. int  main()
  29. {
  30.        
  31.         int guo=0;
  32.         pthread_t thread[3];
  33.         for(guo=0;guo<3;guo++)
  34.         {
  35.                 pthread_create(&thread[guo],NULL,func,NULL);
  36.         }

  37.     int  listenfd = 0;
  38.         int    connfd = 0;
  39.     int serverPort= 8888;
  40.     int   listenq = 5;
  41.     socklen_t socklen = 0;
  42.     int opt = 1;
  43.     int nready = 0;       
  44.     int maxfd  = 0;       
  45.         int maxi = -1;

  46.         char buf[50]= {0};
  47.     struct sockaddr_in cliaddr, servaddr;
  48.     socklen = sizeof(struct sockaddr_in);

  49.     memset(&servaddr, 0, sizeof(servaddr));
  50.     memset(&cliaddr,  0, sizeof(cliaddr));
  51.        
  52.     servaddr.sin_family = AF_INET;
  53.     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  54.     servaddr.sin_port = htons(serverPort);

  55.     listenfd = socket(AF_INET, SOCK_STREAM, 0);
  56.     if (listenfd < 0)
  57.         {
  58.         perror("socket error");
  59.         return -1;
  60.     }
  61.         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt)) < 0)
  62.     {
  63.         perror("setsockopt error");   
  64.     }
  65.     if (bind(listenfd, (struct sockaddr *) &servaddr, socklen) < 0)
  66.         {
  67.         perror("bind error");
  68.         return -1;
  69.     }
  70.     if (listen(listenfd, listenq) < 0)
  71.         {
  72.         perror("listen error");   
  73.         return -1;
  74.     }
  75.        
  76.     FD_ZERO(&allset);
  77.     FD_SET(listenfd, &allset);
  78.     maxfd = listenfd;   

  79.         for (int j = 0; j< FD_SETSIZE; j++)
  80.         {
  81.         clientSockFds[j] = -1;                
  82.         }
  83.        
  84.     printf("echo server startup,listen on port:%d\n", serverPort);
  85.         printf("max connection: %d\n", FD_SETSIZE);
  86.         while(1)  
  87.     {
  88.         fd_set rset = allset;
  89.         nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
  90.                 //printf("nready = %d\n",nready);
  91.         if (nready < 0)
  92.         {
  93.             perror("select error");
  94.             continue;
  95.         }
  96.         if (FD_ISSET(listenfd, &rset))
  97.         {
  98.             connfd = accept(listenfd, (struct sockaddr*) &cliaddr, &socklen);
  99.             if (connfd < 0)
  100.             {
  101.                 perror("accept error");
  102.                 continue;
  103.             }
  104.                        
  105.             sprintf(buf, "accept form %s:%d: connfd=%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port,connfd);
  106.             printf(buf, "");

  107.                         int i=0;
  108.             for (i = 0; i< FD_SETSIZE; i++)
  109.             {
  110.                 if (clientSockFds[i] == -1)
  111.                 {
  112.                     clientSockFds[i] = connfd;
  113.                     break;
  114.                 }
  115.             }
  116.             if (i == FD_SETSIZE)
  117.             {
  118.                 fprintf(stderr, "too many connection, more than %d\n", FD_SETSIZE);
  119.                 close(connfd);
  120.                 break;
  121.             }

  122.             if (connfd > maxfd)
  123.                 maxfd = connfd;
  124.                                
  125.             FD_SET(connfd, &allset);
  126.         }
  127.                 printf("main111()  tasklist.size():[%d]--->Thread:%d\n\n",tasklist.size(),pthread_self());       
  128.                 for(int m=0;m<1024 && clientSockFds[m] != -1 ;m++)
  129.                 {
  130.                         if (FD_ISSET(clientSockFds[m], &rset))
  131.                         {
  132.                                 pthread_mutex_lock(&lock);
  133.                                 Task *task = create_task(clientSockFds[m]);
  134.                                 tasklist.push_back(*task);
  135.                                
  136.                                 printf("main222()  task->fd:%d--->tasklist.size():[%d]--->Thread:%d\n",task->fd,tasklist.size(),pthread_self());       
  137.                                 pthread_mutex_unlock(&lock);
  138.                         }
  139.                
  140.                 }
  141.     }
  142.         close(listenfd);
  143.         return 0;
  144. }

  145. Task *create_task(int socket)
  146. {
  147.          Task *task = (Task *)malloc(sizeof(Task));
  148.          task->fd=socket;
  149.          task->op= readflag;
  150.         return task;
  151. }

  152. void *func( void *arg )
  153. {
  154.         char buffer[1024]={0};
  155.         Task task;
  156.         pthread_detach(pthread_self());
  157.         while(1)
  158.         {
  159.                 pthread_mutex_lock(&lock);
  160.                 if(tasklist.size()>0)
  161.                 {
  162.                        
  163.                         task = tasklist.front();
  164.                         tasklist.pop_front();
  165.                         printf("task.fd:%d----->tasklist.size():[%d]--->Thread:%d\n",task.fd,tasklist.size(),pthread_self());
  166.                         exit(0);
  167.                         int xxx=read(task.fd,buffer,sizeof(buffer));
  168.                         if(xxx==0)
  169.                         {
  170.                                 printf("client %d close!\n",task.fd);
  171.                                 FD_CLR(task.fd,&allset);
  172.                                 close(task.fd);
  173.                                 for (int i = 0; i< FD_SETSIZE; i++)
  174.                                 {
  175.                                         if (clientSockFds[i] == task.fd)
  176.                                         {
  177.                                                 clientSockFds[i] = -1;
  178.                                                 break;
  179.                                         }
  180.                                 }
  181.                                
  182.                         }
  183.                         else if(xxx>0)
  184.                         {
  185.                                 printf("fd:%d-->Thread:%d---->receive data:%s\n\n",task.fd,pthread_self(),buffer);
  186.                         }
  187.                         else
  188.                         {
  189.                                 printf("wrong!!\n");
  190.                                 FD_CLR(task.fd,&allset);
  191.                                 close(task.fd);
  192.                                 for (int i = 0; i< FD_SETSIZE; i++)
  193.                                 {
  194.                                         if (clientSockFds[i] == task.fd)
  195.                                         {
  196.                                                 clientSockFds[i] = -1;
  197.                                                 break;
  198.                                         }
  199.                                 }
  200.                                
  201.                         }
  202.                        
  203.                 }
  204.                 pthread_mutex_unlock(&lock);
  205.                 sleep(1);
  206.         }

  207.         return 0;
  208. }

复制代码
客户端:

  1. #include  <unistd.h>
  2. #include  <sys/types.h>       /* basic system data types */
  3. #include  <sys/socket.h>      /* basic socket definitions */
  4. #include  <netinet/in.h>      /* sockaddr_in{} and other Internet defns */
  5. #include  <arpa/inet.h>       /* inet(3) functions */
  6. #include  <stdlib.h>
  7. #include  <errno.h>
  8. #include  <stdio.h>
  9. #include  <string.h>
  10. #include  <fcntl.h>
  11. #include  <sys/select.h>
  12. #include  <pthread.h>
  13. #include  <iostream>
  14. #include  <list>
  15. using namespace std;

  16. int main()
  17. {
  18.         int cfd; /* 文件描述符 */
  19.         int recbytes;
  20.         int sin_size;
  21.         char buffer[1024]={0};   
  22.         struct sockaddr_in s_add,c_add;
  23.         unsigned short portnum=8888;  

  24.         printf("Hello,welcome to client !\r\n");

  25.         cfd = socket(AF_INET, SOCK_STREAM, 0);
  26.         if(-1 == cfd)
  27.         {
  28.                 printf("socket fail ! \r\n");
  29.                 return -1;
  30.         }
  31.         printf("socket ok !\r\n");
  32.        
  33.         bzero(&s_add,sizeof(struct sockaddr_in));
  34.         s_add.sin_family=AF_INET;
  35.         s_add.sin_addr.s_addr= inet_addr("127.0.0.1");
  36.         s_add.sin_port=htons(portnum);

  37.         if(-1 == connect(cfd,(struct sockaddr *)(&s_add), sizeof(struct sockaddr)))
  38.         {
  39.                 printf("connect fail !\r\n");
  40.                 return -1;
  41.         }
  42.         printf("connect ok !\r\n");

  43.         int j=0;
  44.         struct timeval tv;  
  45.         while(1)
  46.         {
  47.                
  48.                 memset(buffer,0,sizeof(buffer));
  49.                 sprintf(buffer,"%d%d",j,j);
  50.                 if(-1 == (recbytes = write(cfd,buffer,strlen(buffer)+1)))
  51.                 {
  52.                         printf("write data fail !\r\n");
  53.                         return -1;
  54.                 }
  55.                 printf("write ok.buffer=[%s]\n",buffer);
  56.                 sleep(1);
  57.                 j++;
  58.                
  59.                 //test select   
  60.          tv.tv_sec = 0;  
  61.          tv.tv_usec = 1000;  
  62.                  int ret = select(0, NULL, NULL, NULL, &tv);  
  63.          if (-1 == ret)  
  64.          {  
  65.             printf("wweeeeeee\n");
  66.          }
  67.        
  68.         }
  69.         close(cfd);
  70.         return 0;
  71. }
复制代码

论坛徽章:
4
双子座
日期:2014-08-28 10:08:002015年辞旧岁徽章
日期:2015-03-03 16:54:152015年迎新春徽章
日期:2015-03-04 09:58:112015年亚洲杯之阿联酋
日期:2015-03-13 03:25:15
发表于 2015-04-02 10:52 |显示全部楼层
你的代码逻辑,应该是来了一个数据包,就建一个task,而不是一个连接建一个task,所以客户端循环发送的时候,自然服务端的tasklist瞬间就上升到1024了

论坛徽章:
0
发表于 2015-04-02 11:31 |显示全部楼层
回复 2# weishuo1999


    懂了,一般来说,你们是一个连接一个task,还是一个数据包一个task? 哪种好点呢?

论坛徽章:
0
发表于 2015-04-02 12:10 |显示全部楼层
为啥我用epoll,用相同得思路写代码,就没select那个问题呢???

  1. #include  <unistd.h>
  2. #include  <sys/types.h>       /* basic system data types */
  3. #include  <sys/socket.h>      /* basic socket definitions */
  4. #include  <netinet/in.h>      /* sockaddr_in{} and other Internet defns */
  5. #include  <arpa/inet.h>       /* inet(3) functions */
  6. #include  <stdlib.h>
  7. #include  <errno.h>
  8. #include  <stdio.h>
  9. #include  <string.h>
  10. #include  <fcntl.h>
  11. #include  <sys/select.h>
  12. #include  <pthread.h>
  13. #include  <iostream>
  14. #include <sys/epoll.h>
  15. #include  <list>
  16. #include <signal.h>
  17. using namespace std;


  18. pthread_mutex_t lock;

  19. enum falgrw{readflag = 0,writeflag};

  20. typedef struct{
  21.         int fd;
  22.         int op;
  23. }Task;

  24. void *func( void *arg );
  25. Task *create_task(int socket);
  26. struct epoll_event ev,events[20];
  27. int epfd=0;

  28. list<Task> tasklist(0);
  29. int  main()
  30. {
  31.        
  32.         int guo=0;
  33.         pthread_t thread[3];
  34.         for(guo=0;guo<3;guo++)
  35.         {
  36.                 pthread_create(&thread[guo],NULL,func,NULL);
  37.         }

  38.         int  listenfd = 0;
  39.         int    connfd = 0;
  40.         int serverPort= 8888;
  41.         int   listenq = 5;
  42.         socklen_t socklen = 0;
  43.         int opt = 1;
  44.         int nready = 0;       
  45.         int maxfd  = 0;       
  46.         int maxi = -1;

  47.         char buf[50]= {0};
  48.         struct sockaddr_in cliaddr, servaddr;
  49.         socklen = sizeof(struct sockaddr_in);

  50.         memset(&servaddr, 0, sizeof(servaddr));
  51.         memset(&cliaddr,  0, sizeof(cliaddr));
  52.        
  53.         servaddr.sin_family = AF_INET;
  54.         servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  55.         servaddr.sin_port = htons(serverPort);

  56.         listenfd = socket(AF_INET, SOCK_STREAM, 0);
  57.         if (listenfd < 0)
  58.         {
  59.                 perror("socket error");
  60.                 return -1;
  61.         }
  62.         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt)) < 0)
  63.         {
  64.                 perror("setsockopt error");   
  65.         }
  66.         if (bind(listenfd, (struct sockaddr *) &servaddr, socklen) < 0)
  67.         {
  68.                 perror("bind error");
  69.                 return -1;
  70.         }
  71.         if (listen(listenfd, listenq) < 0)
  72.         {
  73.                 perror("listen error");   
  74.                 return -1;
  75.         }
  76.        
  77.         //epoll LT
  78.         epfd=epoll_create(256);
  79.         ev.data.fd=listenfd;
  80.         //设置要处理的事件类型
  81.         ev.events=EPOLLIN;
  82.         //注册epoll事件
  83.         epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
  84.        
  85.         int nfds=0;
  86.        
  87.        
  88.         printf("echo server startup,listen on port:%d\n", serverPort);
  89.         while(1)  
  90.         {
  91.                
  92.                 struct timeval tv;
  93.                 tv.tv_sec = 1;
  94.                 tv.tv_usec = 0;

  95.                 nfds=epoll_wait(epfd,events,20,500);
  96.                 for(int i=0;i<nfds;++i)
  97.                 {
  98.                         if(events[i].data.fd==listenfd)
  99.                         {                 
  100.                                 connfd = accept(listenfd,(sockaddr *)&cliaddr, &socklen);
  101.                                 if(connfd<0){
  102.                                         perror("connfd<0");
  103.                                         exit(1);
  104.                                 }
  105.                                 //设置用于读操作的文件描述符
  106.                                 ev.data.fd=connfd;
  107.                                 //设置用于注测的读操作事件
  108.                                 ev.events=EPOLLIN|EPOLLET;
  109.                                 //注册ev
  110.                                 epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
  111.                         }
  112.                         else if(events[i].events&EPOLLIN)
  113.                         {
  114.                                 printf("You Can Read Data!\n");
  115.                                 int sockfd=0;
  116.                                 if ( (sockfd = events[i].data.fd) < 0)
  117.                                 continue;
  118.                                 pthread_mutex_lock(&lock);
  119.                                 Task *task = create_task(sockfd);
  120.                                 tasklist.push_front(*task);
  121.                                 printf("ADD task;task.fd:%d----->tasklist.size():[%d]--->main()\n",task->fd,tasklist.size());
  122.                                 pthread_mutex_unlock(&lock);
  123.                         }
  124.                         else if(events[i].events&EPOLLOUT)
  125.                         {
  126.                         }                        
  127.                 }
  128.         }
  129.         close(listenfd);
  130.         return 0;
  131. }

  132. Task *create_task(int socket)
  133. {
  134.         Task *task = (Task *)malloc(sizeof(Task));
  135.         task->fd=socket;
  136.         task->op= readflag;
  137.         return task;
  138. }

  139. void *func( void *arg )
  140. {
  141.         char buffer[1024]={0};
  142.         Task task;
  143.         pthread_detach(pthread_self());
  144.         while(1)
  145.         {
  146.                 pthread_mutex_lock(&lock);
  147.                 if(tasklist.size()>0)
  148.                 {
  149.                         memset(buffer,0,1024);
  150.                         task = tasklist.front();
  151.                         //printf("task.fd:%d----->tasklist.size():[%d]--->Thread:%d\n",task.fd,tasklist.size(),pthread_self());
  152.                         tasklist.pop_front();
  153.                         int xxx=read(task.fd,buffer,sizeof(buffer));
  154.                         if(xxx==0)
  155.                         {
  156.                                 printf("client %d close!\n",task.fd);
  157.                                 printf("DEL task;task.fd:%d----->tasklist.size():[%d]--->Thread:%d---->receive data:%s\n\n",task.fd,tasklist.size(),pthread_self(),buffer);
  158.                                 close(task.fd);
  159.                                 epoll_ctl(epfd,EPOLL_CTL_DEL,task.fd,&ev);
  160.                         }
  161.                         else if(xxx>0)
  162.                         {
  163.                                 printf("DEL task;task.fd:%d----->tasklist.size():[%d]--->Thread:%d---->receive data:%s\n\n",task.fd,tasklist.size(),pthread_self(),buffer);
  164.                         }
  165.                         else
  166.                         {
  167.                                 printf("wrong!!\n");
  168.                                 close(task.fd);
  169.                                 epoll_ctl(epfd,EPOLL_CTL_DEL,task.fd,&ev);
  170.                         }
  171.                        
  172.                 }
  173.                 pthread_mutex_unlock(&lock);
  174.                 sleep(1);
  175.         }

  176.         return 0;
  177. }

复制代码
为啥我用epoll,用相同得思路写代码,就没select那个问题呢???

论坛徽章:
7
数据库技术版块每日发帖之星
日期:2015-08-08 06:20:00数据库技术版块每日发帖之星
日期:2015-08-29 06:20:00数据库技术版块每日发帖之星
日期:2015-08-29 06:20:00数据库技术版块每日发帖之星
日期:2015-09-18 06:20:00数据库技术版块每周发帖之星
日期:2015-11-06 19:56:51数据库技术版块每日发帖之星
日期:2016-01-22 06:20:00数据库技术版块每日发帖之星
日期:2016-02-05 06:20:00
发表于 2015-04-02 13:58 |显示全部楼层
是不是因为在 accept 后的 fd ,使用了ET触发?
改成 LT 试试呢?
  1.                                 //设置用于读操作的文件描述符
  2.                                 ev.data.fd=connfd;
  3.                                 //设置用于注测的读操作事件
  4.                                 ev.events=EPOLLIN|EPOLLET;
复制代码

论坛徽章:
0
发表于 2015-04-02 14:20 |显示全部楼层
[code]                                //设置用于读操作的文件描述符

                                ev.data.fd=connfd;

                                //设置用于注测的读操作事件

                                ev.events=EPOLLIN;[/code


改成这样,也没有select那个问题!!
   

论坛徽章:
0
发表于 2015-04-02 14:55 |显示全部楼层
回复 5# asdf2110


    我本来用的就是epoll LT模式。。。

论坛徽章:
0
发表于 2015-04-02 15:29 |显示全部楼层
本帖最后由 guojinshuai 于 2015-04-02 15:33 编辑

回复 5# asdf2110


    看错了,我改改试试。

   epoll改成LT之后,现象跟select一样了!!!!求解释,求解决办法!!!!!

论坛徽章:
324
射手座
日期:2013-08-23 12:04:38射手座
日期:2013-08-23 16:18:12未羊
日期:2013-08-30 14:33:15水瓶座
日期:2013-09-02 16:44:31摩羯座
日期:2013-09-25 09:33:52双子座
日期:2013-09-26 12:21:10金牛座
日期:2013-10-14 09:08:49申猴
日期:2013-10-16 13:09:43子鼠
日期:2013-10-17 23:23:19射手座
日期:2013-10-18 13:00:27金牛座
日期:2013-10-18 15:47:57午马
日期:2013-10-18 21:43:38
发表于 2015-04-02 15:46 |显示全部楼层
一个socket可读,在没有把数据read完之前,一直可读(水平触发),所以你得在放到tasklist之后,需要把该socket从读fd_set中去掉,read完再加入fd_set。

BTW:采用你这样线程分工的很少见。

论坛徽章:
0
发表于 2015-04-02 15:55 |显示全部楼层
回复 9# hellioncu


    恩,原因是这个,按你说的方法解决了下,确实没问题了,非常感谢啊。

   那更好的利用多线程你们一般怎么做的呀,我是初学者。我这样利用多线程效率不高吧?
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP