免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 10913 | 回复: 5
打印 上一主题 下一主题

epoll_wait问题 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2007-06-11 12:33 |只看该作者 |倒序浏览
大家有没有遇到epoll_wait()一次返回的事件里有相同的events,就是events[i].data.fd=events[j].data.fd (i!=j), 我测试的是一秒大概5000个tcp通信

论坛徽章:
0
2 [报告]
发表于 2007-06-11 12:45 |只看该作者
沒有遇到過,檢查一下其他操作會不會造成內存問題吧。

论坛徽章:
0
3 [报告]
发表于 2007-06-11 14:15 |只看该作者
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
#include<errno.h>
#define MAXLINE 40
#define OPEN_MAX 100
#define LISTENQ 5000
#define SERV_PORT 8652
#define INFTIM 1000
//线程池任务队列结构体
struct task{
  int fd;            //需要读写的文件描述符
  struct task *next; //下一个任务
};
//用于读写两个的两个方面传递参数
struct user_data{
int fd;
  unsigned int n_size;
  char line[MAXLINE];

};
//线程的任务函数
void * readtask(void *args);
void * writetask(void *args);
//声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
struct epoll_event ev,events[5000];
int epfd;
pthread_mutex_t mutex;
pthread_cond_t cond1;
struct task *readhead=NULL,*readtail=NULL,*writehead=NULL;
void setnonblocking(int sock)
{
     int opts;
     opts=fcntl(sock,F_GETFL);
     if(opts<0)
     {
          perror("fcntl(sock,GETFL)");
          exit(1);
     }
    opts = opts|O_NONBLOCK;
     if(fcntl(sock,F_SETFL,opts)<0)
     {
          perror("fcntl(sock,SETFL,opts)");
          exit(1);
     }   
}
int main()
{
    int i, maxi, listenfd, connfd, sockfd,nfds,res;
     pthread_t tid[10];
     struct task *new_task=NULL;
     struct user_data *rdata=NULL;
     socklen_t clilen;
                struct linger        optval1;
     pthread_mutex_init(&mutex,NULL);
     pthread_cond_init(&cond1,NULL);
     //初始化用于读线程池的线程
                int ii;
                for(ii=0;ii<10;ii++)
     pthread_create(tid+ii,NULL,readtask,NULL);
     
     //生成用于处理accept的epoll专用的文件描述符   
     epfd=epoll_create(6000);
     struct sockaddr_in clientaddr;
     struct sockaddr_in serveraddr;
                clilen=sizeof(sockaddr_in);
     listenfd = socket(AF_INET, SOCK_STREAM, 0);
   optval1.l_onoff = 1;
    optval1.l_linger = 60;
    setsockopt(listenfd, SOL_SOCKET, SO_LINGER, &optval1, sizeof(struct linger));
     //把socket设置为非阻塞方式
     setnonblocking(listenfd);
     //设置与要处理的事件相关的文件描述符
     ev.data.fd=listenfd;
     //设置要处理的事件类型
     ev.events=EPOLLIN|EPOLLET;
     //注册epoll事件
     epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
     bzero(&serveraddr, sizeof(serveraddr));
     serveraddr.sin_family = AF_INET;
//    char *local_addr="200.200.200.222";

//    inet_aton(local_addr,&(serveraddr.sin_addr));//htons(SERV_PORT);
                serveraddr.sin_addr.s_addr=htonl(INADDR_ANY);
     serveraddr.sin_port=htons(SERV_PORT);
     if(bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr))==-1){exit(1);}
     if(listen(listenfd, LISTENQ)==-1){exit(1);}  
     maxi = 0;
        int num=0;
        int num1=0;

     for ( ; ; ) {
          //等待epoll事件的发生
                        //fprintf(stderr,"for");
                                int old=0;
          nfds=epoll_wait(epfd,events,6000,-1);
          //处理所发生的所有事件     
        for(i=0;i<nfds;++i)
        {
                //        fprintf(stderr,"nfds %d",nfds);
              if(events.data.fd==listenfd)
               {                              printf("listen=%d\n",events.data.fd);
                    connfd = accept(listenfd,(sockaddr *)(&clientaddr), &clilen);
                    if(connfd<0){
                      perror("connfd<0");
                      exit(1);

                   }            
                   setnonblocking(connfd);                 
              char *str = inet_ntoa(clientaddr.sin_addr);
                    std::cout<<"connec_ from >>"<<str<<"  "<<connfd<<std::endl;

                    //设置用于读操作的文件描述符

                    ev.data.fd=connfd;

                    //设置用于注测的读操作事件
                                                //ev.data.ptr=NULL;
                 ev.events=EPOLLIN|EPOLLET;

                    //注册ev

                 epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
                ev.data.fd=listenfd;

                    //设置要处理的事件类型

             ev.events=EPOLLIN|EPOLLET;
       
                     //注册epoll事件
               
             epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev);
                        continue;

               }
            else if(events.events&EPOLLIN)
            {
                                                num1++;
                 //   fprintf(stderr,"reading! %d\n",num1);               
                    if ( (sockfd = events.data.fd) <= 0) {num1--;continue;}
                                                new_task=NULL;
                   while(new_task==NULL)
                                                         new_task=new task();
                                               
                    new_task->fd=sockfd;
                    new_task->next=NULL;
                                                        //fprintf(stderr,"sockfd %d",sockfd);
                    //添加新的读任务
                    pthread_mutex_lock(&mutex);
                    if(readhead==NULL)
                    {
                      readhead=new_task;
                      readtail=new_task;
                    }   
                    else
                    {   
                    readtail->next=new_task;
                      readtail=new_task;
                    }   
                   //唤醒所有等待cond1条件的线程
                    pthread_cond_broadcast(&cond1);
                    pthread_mutex_unlock(&mutex);  
                                                continue;
              }
              else if(events.events&EPOLLOUT)

               {   
                                //        fprintf(stderr,"EPOLLOUT");
                                         
                                        num++;
              rdata=(struct user_data *)events.data.ptr;
                                       
                 sockfd =rdata->fd;
                                                if(old==sockfd)
                                                {
                                                        fprintf(stderr,"repreted sockfd=%d\n",sockfd);
                                                        //exit(1);
                                                }
                                                old=sockfd;       
                                        //        fprintf(stderr,"write  %d\n",num);
                 int size=write(sockfd, rdata->line, rdata->n_size);
               // fprintf(stderr,"write=%d delete rdata\n",size);
                 fprintf(stderr,"addr=%x fdwrite=%d size=%d\n",rdata,rdata->fd,size);
                                                if(rdata!=NULL)//主要问题导致delete重复相同对象 events返回对象相同
                                                {
                                         delete rdata;
                                                rdata=NULL;
                                       
                                                }
                 //设置用于读操作的文件描述符
           //fprintf(stderr,"after delete rdata\n");
                 ev.data.fd=sockfd;
                                       
                 //设置用于注测的读操作事件

               ev.events=EPOLLIN|EPOLLET;

                 //修改sockfd上要处理的事件为EPOLIN
                                         res=epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
                                               
               while(res==-1)
                                                        {//fprintf(stderr,"out error");        exit(1);
                                                        }
                                //        fprintf(stderr,"out EPOLLOUT\n");
                continue;

               }
                else if(events.events&(EPOLLHUP|EPOLLERR))
                {
                        //fprintf(stderr,"EPPOLLERR\n");
                        int fd=events.data.fd;
                              if(fd>6000)
                                        {
                                                fd=((struct user_data*)(events.data.ptr))->fd;
                                        }
                 //设置用于注测的读操作事件
                                               ev.data.fd=fd;
               ev.events=EPOLLIN|EPOLLET|EPOLLOUT;

                 //修改sockfd上要处理的事件为EPOLIN

               epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&ev);
                }   
                        
          }        
  }
}

void * readtask(void *args)

{
   int fd=-1;
    int n;
   //用于把读出来的数据传递出去
   struct user_data *rdata = NULL;
   while(1){  
        pthread_mutex_lock(&mutex);
        //等待到任务队列不为空
        while(readhead==NULL)
             pthread_cond_wait(&cond1,&mutex);        
        fd=readhead->fd;
        //从任务队列取出一个读任务
                       
        struct task *tmp=readhead;
        readhead = readhead->next;
                        //fprintf(stderr,"delete tmp\n");
        delete tmp;
                        tmp=NULL;
        pthread_mutex_unlock(&mutex);
                        rdata=NULL;
                        while(rdata==NULL)
        rdata = new user_data();
                //        fprintf(stderr,"data=%x fd=%d",rdata,fd);
                        //sleep(60);
                       
        rdata->fd=fd;
                       
        if ( (n = read(fd, rdata->line, MAXLINE)) < 0) {                               
           if (errno == ECONNRESET) {
                        //         fprintf(stderr,"ECONNRESET!\n");
                                ev.events=EPOLLOUT|EPOLLET;
                         ev.data.fd=fd;
                                //修改sockfd上要处理的事件为EPOLLOUT
                                int res;
                                res=epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&ev);
             close(fd);         
          } else
           //  printf("readline error");
           if(rdata!=NULL){delete rdata;rdata=NULL;}
                       
        } else if (n == 0) {
                                 ev.events=EPOLLOUT|EPOLLET;
                         ev.data.fd=fd;
                                //修改sockfd上要处理的事件为EPOLLOUT
                                int res;
                                res=epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&ev);
            close(fd);
                                //fprintf(stderr,"Client close connect!\n");
                               
           if(rdata!=NULL){delete rdata;rdata=NULL;}
           while(res==-1)
                                                        {        //fprintf(stderr,"in ctl error");
                                                                exit(1);
                                                        };
           
        } else{

              // std::cout<<"   n= "<<n<<std::endl;
       //  std::cout<<"read  "<<data->line<<std::endl;
                //        std::cout.flush();
        rdata->n_size=n;
            
        //设置需要传递出去的数据

      
       //  ev.data.fd=fd;
        //设置用于注测的写操作事件
                        fprintf(stderr,"fdread=%d\n",fd);
        ev.events=EPOLLOUT|EPOLLET;
         ev.data.ptr=rdata;
        //修改sockfd上要处理的事件为EPOLLOUT
                        int res;
                        res=epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);
        while(res==-1)
                                                        {        //fprintf(stderr,"in ctl error");
                                                                exit(1);
                                                        };

       }

   }

}
我把代码贴出来,大家帮我看看,调了几天了,本来就没什么多线程经验

论坛徽章:
0
4 [报告]
发表于 2007-06-12 15:33 |只看该作者
只粗略看了看,既然用了多线程,应该用mutex防止并行操作的。

man pthread_mutex_init

论坛徽章:
0
5 [报告]
发表于 2007-06-12 16:20 |只看该作者
同一次epoll_wait调用fd不会重复,manpage里说的

论坛徽章:
0
6 [报告]
发表于 2007-06-12 19:50 |只看该作者
to4楼 我用了线程锁啊,没看见
to5楼 我也知道,但是程序就是出现了这种情况所以我才问一下,我也感觉很奇怪。调了几天了,找不出原因
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP