免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 3804 | 回复: 7
打印 上一主题 下一主题

非阻塞IO 判断就绪状态 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2010-04-26 18:32 |只看该作者 |倒序浏览
非阻塞IO读写,  如何判断轮询fd就绪. 如果不使用select (poll), epoll

论坛徽章:
0
2 [报告]
发表于 2010-04-26 18:39 |只看该作者
不用select epoll你就自己去读下看成功不成功呗!

论坛徽章:
0
3 [报告]
发表于 2010-04-26 19:15 |只看该作者
如果自己轮询检测
如何判断出错和断线呢
使用select时,
如果read  返回0 是断线, <0是出错
如果设置了非阻塞IO 自己轮询的话  , 出错也是这种情况么
感觉自己这样轮询判断似乎不如select的效率高

论坛徽章:
0
4 [报告]
发表于 2010-04-26 21:00 |只看该作者
大家有epoll的实例吗?可以参考学习下吗?网上的例子很不好。

论坛徽章:
1
IT运维版块每日发帖之星
日期:2015-11-17 06:20:00
5 [报告]
发表于 2010-04-27 08:55 |只看该作者
回复 4# loveye2009


    CU上的例子不够吗?

论坛徽章:
0
6 [报告]
发表于 2010-04-27 09:15 |只看该作者
回复 4# loveye2009

自己写的, C++的例子, 简单的http server, 访问出现hello world, 使用epoll
  1. #include<iostream>
  2. #include<string>
  3. #include<map>
  4. #include<sys/types.h>
  5. #include<sys/socket.h>
  6. #include<arpa/inet.h>
  7. #include<unistd.h>
  8. #include<fcntl.h>
  9. #include<signal.h>
  10. #include<sys/epoll.h>
  11. #include<string.h>
  12. #include<error.h>
  13. #include<errno.h>
  14. #include<stdlib.h>
  15. #include<sys/resource.h>
  16. #include<pthread.h>
  17. #include<time.h>

  18. using namespace std;

  19. #define LISTENQ 64
  20. #define MAX_EVENTS 10000
  21. #define BUFSIZE 8192

  22. static unsigned long countacp=0;
  23. static unsigned long countin=0;
  24. static unsigned long countout=0;
  25. static unsigned long counttimeout=0;

  26. struct client_t{
  27.         int fd;
  28.         int stat;
  29.         string ip;
  30.         string read_buf;
  31.         string write_buf;
  32.         time_t last_alive_time;
  33.         int is_timeout;
  34.         client_t(){
  35.                 is_timeout = 0;
  36.                 stat = 0;
  37.         }
  38. };

  39. map<int, struct client_t *> fds;
  40. pthread_mutex_t fds_mutex = PTHREAD_MUTEX_INITIALIZER;

  41. static string int2str(int i){
  42.         char buf[32];
  43.         memset(buf, 0, 32);
  44.         sprintf(buf, "%d", i);
  45.         return string(buf);
  46. }

  47. //initialize server socket
  48. //正常返回fd,否则返回-1
  49. static int
  50. init_servsock()
  51. {
  52.         int fd;
  53.         struct sockaddr_in serveraddr;
  54.         //socket
  55.         if((fd=socket(AF_INET, SOCK_STREAM, 0)) < 0){
  56.                 perror("socket");
  57.                 return -1;
  58.         }

  59.         //可重用
  60.         socklen_t reuse=1;
  61.         if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))<0){ //否则程序挂掉马上重启,会报端口不可用
  62.                 perror("setsockopt()");
  63.         }

  64.         serveraddr.sin_family = AF_INET;
  65.         serveraddr.sin_port = htons(4000);
  66.         serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
  67.         bzero(&(serveraddr.sin_zero), 8);
  68.         //bind
  69.         if(bind(fd,(struct sockaddr *)&serveraddr, sizeof(sockaddr)) < 0){
  70.                 perror("bind");
  71.                 return -1;
  72.         }
  73.         //listen
  74.         if(listen(fd, LISTENQ)){
  75.                 perror("listen");
  76.                 return -1;
  77.         }

  78.         return fd;
  79. }

  80. static ssize_t
  81. writen(int fd, const char *vptr, size_t n)
  82. {
  83.         size_t nleft;
  84.         ssize_t nwritten;
  85.         const char *ptr;

  86.         ptr = vptr;
  87.         nleft = n;
  88.         while(nleft > 0){
  89.                 if((nwritten = write(fd, ptr, nleft)) <= 0)
  90.                         return(nwritten); /* error */

  91.                 nleft -= nwritten;
  92.                 ptr += nwritten;
  93.         }
  94.         return(n);
  95. }

  96. static ssize_t
  97. readn(int fd, char *vptr, size_t n)
  98. {
  99.         size_t nleft;
  100.         ssize_t nread;
  101.         char *ptr;

  102.         ptr = vptr;
  103.         nleft = n;
  104.         while(nleft > 0){
  105.                 if((nread = read(fd, ptr, nleft) < 0))
  106.                         return(nread);        /* error, return < 0 */
  107.                 else if(nread ==0 ){
  108.                         break;                /* EOF */
  109.                 }

  110.                 nleft -= nread;
  111.                 ptr += nread;
  112.         }
  113.         return(n - nleft);        /* return >= 0 */
  114. }

  115. static int
  116. set_fl(int fd, int flags)
  117. {
  118.         int val;

  119.         if((val = fcntl(fd,F_GETFL,0)) < 0){
  120.                 printf("fcntl F_GETFL error\n");
  121.                 return -1;
  122.         }

  123.         val |=flags;

  124.         if(fcntl(fd,F_SETFL,val) < 0){
  125.                 printf("fcntl F_SETFL error\n");
  126.                 return -1;
  127.         }

  128.         return 0;
  129. }

  130. //超时检测线程函数
  131. static void *
  132. check_time_out(void * arg){
  133.         int epollfd=*((int *)&arg);
  134.         printf("%d\n",epollfd);
  135.         while(1){
  136.                 map<int,client_t *>::iterator it = fds.begin();
  137.                 time_t tm;
  138.                 time_t run_tm;
  139.                 time(&tm);
  140.                 for(; it!=fds.end(); ++it){
  141.                         if(it->second != NULL){
  142.                                 run_tm = tm - it->second->last_alive_time;
  143.                                 if(run_tm >= 30){
  144.                                         pthread_mutex_lock(&fds_mutex);
  145.                                         printf("fd:%d,timeout\n",it->second->fd);
  146.                                         fflush(NULL);
  147.                                         //counttimeout++;
  148.                                         epoll_ctl(epollfd, EPOLL_CTL_DEL, it->second->fd, NULL);
  149.                                         shutdown(it->second->fd, SHUT_RDWR);
  150.                                         close(it->second->fd);
  151.                                         delete it->second;
  152.                                         it->second = NULL;
  153.                                         pthread_mutex_unlock(&fds_mutex);
  154.                                 }
  155.                         }
  156.                 }
  157.                 sleep(2);
  158.         }
  159. }

  160. int
  161. main()
  162. {
  163.         struct epoll_event ev, events[MAX_EVENTS];
  164.         int listen_sock;
  165.         int epollfd;
  166.         int nfds;
  167.         int err;

  168.         //更改系统打开文件限制
  169.         struct rlimit limit;
  170.         limit.rlim_cur = 8192;//getdtablesize();//=1024
  171.         limit.rlim_max = limit.rlim_cur;
  172.         if(setrlimit(RLIMIT_NOFILE, &limit) < 0){
  173.                 perror("setrlimit");
  174.                 exit(1);
  175.         }
  176.        

  177.         epollfd = epoll_create(512);
  178.         if(epollfd == -1){
  179.                 perror("epoll_create");
  180.                 exit(1);
  181.         }

  182.         //创建超时检测线程
  183.         pthread_t pid;
  184.         err = pthread_create(&pid, NULL, check_time_out, (void *)epollfd);
  185.         if(err !=0 ){
  186.                 printf("创建超时检测线程失败!\n");
  187.                 exit(1);
  188.         }

  189.         if((listen_sock = init_servsock())<0){
  190.                 exit(1);
  191.         }

  192.         //设置为非阻塞
  193.         if(set_fl(listen_sock,O_NONBLOCK) < 0){
  194.                 exit(1);
  195.         }

  196.         //忽略信号
  197.         signal(SIGPIPE,SIG_IGN);

  198.         //daemon
  199.         //daemon(0,0);

  200.         ev.events = EPOLLIN;
  201.         ev.data.fd = listen_sock;
  202.         if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1){
  203.                 perror("epoll_ctl:listen_sock");
  204.                 exit(1);
  205.         }



  206.         while(1){
  207.                 //printf("accept:%u\n",countacp);
  208.                 //printf("in:%u\n", countin);
  209.                 //printf("out:%u,\n", countout);
  210.                 //printf("timeout:%u\n", counttimeout);
  211.                 //printf("\n");
  212.                 //fflush(NULL);
  213.                 nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
  214.                 if(nfds == -1){
  215.                         perror("epoll_wait");
  216.                         exit(1);
  217.                 }

  218.                 int n;
  219.                 for(n = 0; n < nfds; ++n){
  220.                         if(events[n].data.fd == listen_sock){
  221.                                 struct sockaddr_in addr_in;
  222.                                 size_t addrlen = sizeof(struct sockaddr_in);
  223.                                 pthread_mutex_lock(&fds_mutex);
  224.                                 int conn_sock = accept(listen_sock, (struct sockaddr *)&addr_in, &addrlen);
  225.                                 time_t tm=time(NULL);
  226.                                 char tbuf[32];
  227.                                 memset(tbuf, 0, 32);
  228.                                 strftime(tbuf,32,"%Y-%m-%d %H:%M:%S", localtime(&tm));
  229.                                 printf("%s\n", tbuf);
  230.                                 fflush(NULL);
  231.                                 if(conn_sock == -1){
  232.                                         perror("accept");
  233.                                         continue;
  234.                                 }

  235.                                 if(set_fl(conn_sock, O_NONBLOCK) < 0){
  236.                                         close(conn_sock);
  237.                                         continue;
  238.                                 }
  239.                                 ev.events = EPOLLIN | EPOLLET;
  240.                                 ev.data.fd = conn_sock;
  241.                                 if(epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1){
  242.                                         perror("epoll_ctl:conn_sock");
  243.                                         close(conn_sock);
  244.                                         continue;
  245.                                 }

  246.                                 countacp++;
  247.                                 client_t *client_ptr = new client_t;
  248.                                 fds[conn_sock] = client_ptr;
  249.                                 fds[conn_sock]->fd = conn_sock;
  250.                                 fds[conn_sock]->ip = string(inet_ntoa(addr_in.sin_addr));
  251.                                 fds[conn_sock]->last_alive_time = time(NULL);
  252.                                 pthread_mutex_unlock(&fds_mutex);
  253.                         }else if(events[n].events & EPOLLIN){
  254.                                 int fd = events[n].data.fd;
  255.                                 int n;
  256.                                 char buf[BUFSIZE];
  257.                                 memset(buf, 0, BUFSIZE);

  258.                                 n = readn(fd, buf, BUFSIZE);
  259.                                 pthread_mutex_lock(&fds_mutex);
  260.                                 fds[fd]->read_buf.append(buf,BUFSIZE);
  261.                                 cout<<fds[fd]->read_buf<<endl<<endl;
  262.                                 if(n >= 0){
  263.                                         //if(fds[fd]->read_buf.find("\r\n\r\n") != string::npos){                /* 检测请求头是否结束 */
  264.                                                 //countin++;
  265.                                                 ev.events = EPOLLOUT | EPOLLET;
  266.                                                 ev.data.fd = fd;
  267.                                                 epoll_ctl(epollfd, EPOLL_CTL_MOD ,fd ,&ev);
  268.                                         //}
  269.                                 }else{//出错
  270.                                         if(errno == EAGAIN){
  271.                                                 continue;
  272.                                         }
  273.                                         shutdown(fds[fd]->fd, SHUT_RDWR);
  274.                                         close(fds[fd]->fd);
  275.                                         delete fds[fd];
  276.                                         fds[fd] = NULL;
  277.                                 }
  278.                                 fds[fd]->last_alive_time=time(NULL);
  279.                                
  280.                                 pthread_mutex_unlock(&fds_mutex);
  281.                                
  282.                         }else if(events[n].events & EPOLLOUT){
  283.                                 int fd = events[n].data.fd;
  284.                                 pthread_mutex_lock(&fds_mutex);
  285.                                 string wb = "<html><head><title>hello world</title></head><body>hello world!!!</body></html>";
  286.                                 fds[fd]->write_buf = "HTTP/1.1 200 Ok\r\n";
  287.                                 fds[fd]->write_buf += "Content-Type: text/html\r\n";
  288.                                 fds[fd]->write_buf += "Content-Length: "+int2str(wb.length())+"\r\n";
  289.                                 fds[fd]->write_buf += "\r\n";
  290.                                 fds[fd]->write_buf +=wb;
  291.                                 int n = writen(fd, fds[fd]->write_buf.data(), fds[fd]->write_buf.length());

  292.                                 fds[fd]->last_alive_time=time(NULL);
  293.                                 if(n == fds[fd]->write_buf.length() || n < 0){
  294.                                         epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
  295.                                         shutdown(fd,SHUT_RDWR);
  296.                                         close(fd);
  297.                                         delete fds[fd];
  298.                                         fds[fd] = NULL;
  299.                                         //countout++;
  300.                                 }
  301.                                 pthread_mutex_unlock(&fds_mutex);
  302.                         }
  303.                 }
  304.         }


  305.         close(listen_sock);
  306.         close(epollfd);

  307.         return 0;
  308. }
复制代码

论坛徽章:
0
7 [报告]
发表于 2010-04-27 10:34 |只看该作者
回复 6# teng0210


    好的,谢谢你。我想用c写,结合epoll和threadpool来处理打批量的客户连接。先看看你的例子,学学epoll的用法。

论坛徽章:
0
8 [报告]
发表于 2010-04-27 10:37 |只看该作者
回复 5# jiufei19

从哪里找呢?不太会用。:wink:
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP