Chinaunix

标题: epoll的几个操作函数是线程安全吗? 表示怀疑~ [打印本页]

作者: ydfgic    时间: 2011-07-20 14:32
标题: epoll的几个操作函数是线程安全吗? 表示怀疑~
因为有用过libevent,libevnet里网络io都是单线程里处理。
今天看到一篇《
epoll加线程池》的文章,看到它在线程池里处理完放送消息后,立即调用
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);

难道这里的epoll_ctl是线程安全的?
如果是的话,那用epoll多线程编程就很方便了, libevent这个问题就很麻烦处理,要用管道解决。
作者: liexusong    时间: 2011-07-20 14:35
谁说libevent都是单线程的,memcached就是多线程的。另外epoll应该不是线程安全的,你要么加锁,要么每个线程控制一个epoll
作者: yulihua49    时间: 2011-07-20 14:38
本帖最后由 yulihua49 于 2011-07-20 14:41 编辑
因为有用过libevent,libevnet里网络io都是单线程里处理。
今天看到一篇《
epoll加线程池》的文章,看到它 ...
ydfgic 发表于 2011-07-20 14:32



    线程安全的!!!!
惊群是良性的,任何fd都不会分配给多个线程。
可以在不同线程 epoll_ctl和epoll_wait.
因此通常不需要L/F模型。
作者: ydfgic    时间: 2011-07-20 15:19
谁说libevent都是单线程的,memcached就是多线程的。另外epoll应该不是线程安全的,你要么加锁,要么每个线 ...
liexusong 发表于 2011-07-20 14:35


memcached多线程应该是自己做了一些同步机制吧,libevent是线程不安全的,反正。
你和3楼的回答矛盾了啊~ 他说是安全的~
作者: hellioncu    时间: 2011-07-20 15:29
man里面说epoll的几个函数是线程安全的
作者: ydfgic    时间: 2011-07-20 15:39
线程安全的!!!!
惊群是良性的,任何fd都不会分配给多个线程。
可以在不同线程 epoll_ctl和 ...
yulihua49 发表于 2011-07-20 14:38


谢谢~ 原来epoll_wait也是线程安全的,这样看来epoll真的做的很不错,对开发者非常有好,我会尽量用epoll取代libevent来编程。
另外,这么一来 LF模型就很好实现了,就一个线程池每个线程来 wait --》 handle,连一般的锁都省去了(虽然有群惊现象)。
但我习惯了半同步/半异步的模型,认为将连接层和接收层还有 逻辑处理线程池 分开比较清晰。 知道了epoll_ctrl线程安全,那用多线程处理就方便许多了,以前还要用
pipe传回到接收线程去,真的有点复杂
作者: ydfgic    时间: 2011-07-20 15:41
man里面说epoll的几个函数是线程安全的
hellioncu 发表于 2011-07-20 15:29


说来惭愧,没有好好看man,看到ET 和 LT 就没看下去了
作者: liexusong    时间: 2011-07-20 16:02
memcached多线程应该是自己做了一些同步机制吧,libevent是线程不安全的,反正。
你和3楼的回答矛盾了 ...
ydfgic 发表于 2011-07-20 15:19



    我也没有man看过,不过线程安全这方面不一定要系统提供,可以自己去实现的
作者: yulihua49    时间: 2011-07-20 16:22
本帖最后由 yulihua49 于 2011-07-20 16:35 编辑
谢谢~ 原来epoll_wait也是线程安全的,这样看来epoll真的做的很不错,对开发者非常有好,我会尽量用ep ...
ydfgic 发表于 2011-07-20 15:39


给你看看我的任务调度器,很短小,很精彩。
  1. static void *thread_work(void *param)
  2. {
  3. resource *rs=(resource *)param;//线程池指针
  4. int ret,fds;
  5. TCB *task;
  6. struct epoll_event event;

  7.         rs->tid=pthread_self();
  8.         while(1) {
  9. //从就绪队列取一个任务
  10.                 pthread_mutex_lock(&rpool.mut);//非IO事件队列,例如取得连接池后的任务要回到这里等待执行
  11.                 while(!(task=rdy_get())) {//这是一个L/F模型,tpool.rdy_num通常=1,1个Leader。多了无益。
  12.                         if(rpool.flg >= tpool.rdy_num) break;
  13.                         rpool.flg++;
  14.                         ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待
  15.                         rpool.flg--;
  16.                 }
  17.                 pthread_mutex_unlock(&rpool.mut);
  18.                 if(!task) {//所有没有成为leader的线程都作为follower,全在epoll_wait上竞争。
  19.                         fds = epoll_wait(g_epoll_fd, &event, 1 , -1);
  20.                         if(fds < 0){
  21.                                 ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno));
  22.                                 sleep(30);
  23.                                 continue;
  24.                         }
  25.                         task = (TCB *)event.data.ptr;
  26.                         task->events=event.events;
  27. //检查事件是否健康
  28.                         if(!task->call_back && !(event.events&EPOLLIN)) {
  29.                                 do_epoll(task,EPOLL_CTL_DEL);
  30.                                 client_del(task);
  31.                                 continue;
  32.                         }
  33.                 }
  34. //              rs->timestamp=
  35.                 task->timestamp=now_usec();
  36.                 ret=do_work(task);//不论收、发、出错都有统一的工作程序处理。
  37.         }
  38.         ShowLog(1,"%s:tid=%lu canceled",__FUNCTION__,pthread_self());
  39.         mthr_showid_del(rs->tid);
  40.         rs->timestamp=now_usec();
  41.         rs->status=0;
  42.         rs->tid=0;
  43.         return NULL;
  44. }
复制代码

作者: ydfgic    时间: 2011-07-20 17:57
回复 9# yulihua49

确实这段代码很短小精悍

有几个地方小弟分析一下看对不对:
我看到一个 介绍 LF的文章,大致意思是它只有一个 Leader去处理网络请求,然后自己编程 proccesor 处理请求,其余flower会有一个成为leader接替它。
而你的实现里,意思则是相反,一个线程在等待任务队列(tpool.rdy_num= 1),其余线程池里的线程会等待在epoll_wait上,然后会被唤醒处理一个分配的相应事件,
根据这个事件产生task,然后处理这个task。
那么我觉得有两点不合理的地方,
1.如果相应事件过多,超出了wait线程数量,这时候每个线程都会去处理一个task,而这个task的处理事件过长,那么就没有线程在 epoll上等待,会造成网络处理的低效(比如连接拒绝)
2.只有一个线程在等待任务队列,如果同时到达几个任务的时候,就没有及时的线程相应去处理(当然,那些丢任务的线程会去取队列front端的任务,但是万一不及时呢?)
所以我觉得这个模型,潜在的让worker影响到了网络处理。

我想至少要有个线程保证在处理网络输入,而不是自己去做事,应该丢到 任务队列里,排队去。或者靠优先级,让高优先级的直接处理,长时间的丢队列。
其实这样我觉得和HS/HA半同步/半异步模式 差不多了,这个可能更高效点,在线程资源的使用上。
作者: yulihua49    时间: 2011-07-22 11:23
回复  yulihua49

1.如果相应事件过多,超出了wait线程数量,这时候每个线程都会去处理一个task,而这个task的处理事件过长,那么就没有线程在 epoll上等待,会造成网络处理的低效(比如连接拒绝)
ydfgic 发表于 2011-07-20 17:57


accept不在这里(在主线程里),所以无线程可用,不影响连接的接入。接入后的fd在epoll队列里等着。
作者: yulihua49    时间: 2011-07-22 11:25
本帖最后由 yulihua49 于 2011-07-22 11:56 编辑
回复  yulihua49

只有一个线程在等待任务队列,如果同时到达几个任务的时候,就没有及时的线程相应去处理(当然,那些丢任务的线程会去取队列front端的任务,但是万一不及时呢?)
ydfgic 发表于 2011-07-20 17:57

经过大压力测试,不会发生这种情况。
rdy队列平时是空的,我们在此浪费了一个线程。
当有客户端请求时,一定会有一个follower先激活,激活后没有线程转换,直接进入工作。所以效率很高。
在工作中,如果需要连接池,他会直接申请,如果申请成功,直接工作,仍然没有线程切换。就是,rdy队列仍然无事可做。
只有申请失败,把任务排到相应的wait队列由另外的线程专门负责等待连接池。工作线程释放,回到follower。
weit线程得到连接后,交给该任务,把它排到rdy队列。这就激活了leader.
这都是连接池耗尽才有的动作。此时一定会有大量work线程在活动。活动结束的线程通常会在第一时间抓取rdy,甚至连leader都不做,直接进入下一个work。那些得到连接池的任务,万事俱备,以最高的优先级迅速得到服务,以便尽快释放连接池。
测试表明这是一个效率极高的方案。
一个交易转发器,
在16核,20个线程。每秒转发,62200个交易。CPU使用率高达1485%。负载是均衡的。最忙线程是不断变换的,所有线程都繁忙,并没有当leader上瘾不干活的。

同时到达上万的任务,该在哪等就在哪,总会得到服务,不会饿死的。

epoll_wait是竞争安全的,所以没必要L/F。
作者: yulihua49    时间: 2011-07-22 12:02
本帖最后由 yulihua49 于 2011-07-22 12:06 编辑
回复  yulihua49

我想至少要有个线程保证在处理网络输入,而不是自己去做事,应该丢到 任务队列里,排队去。或者靠优先级,让高优先级的直接处理,长时间的丢队列。
其实这样我觉得和HS/HA半同步/半异步模式 差不多了,这个可能更高效点,在线程资源的使用上。ydfgic 发表于 2011-07-20 17:57

的确如此。看看主线程片断:
  1.         while(1) {
  2.                 do {
  3.                         FD_ZERO(&efds);
  4.                         FD_SET(sock, &efds);
  5. //健康检查周期
  6.                         tm.tv_sec=30;
  7.                         tm.tv_usec=0;
  8.                         ret=select(sock+1,&efds,NULL,&efds,&tm);
  9.                         if(ret==-1) {
  10.                                 ShowLog(1,"select error %s",strerror(errno));
  11.                                 close(sock);
  12.                                 quit(3);
  13.                         }
  14.                         if(ret==0) {
  15.                                 check_TCB_timeout();
  16.                                 if(poolchk) poolchk();
  17.                         }
  18.                 } while(ret<=0);
  19.                 i=event_no();
  20.                 s=accept(sock,(struct sockaddr *)&cin,&leng);
  21.                 if(s<0) {
  22.                         ShowLog(1,"%s:accept err=%d,%s",__FUNCTION__,errno,strerror(errno));
  23.                         switch(errno) {
  24.                         case EMFILE:    //fd用完了,其他线程还要继续工作,主线程休息一下。
  25.                         case ENFILE:
  26.                                 sleep(30);
  27.                                 continue;
  28.                         default:break;
  29.                         }
  30.                         sleep(15);
  31.                         if(++repeat < 20) continue;
  32.                         ShowLog(1,"%s:network fail! err=%s",__FUNCTION__,strerror(errno));
  33.                         close(sock);
  34.                         quit(5);
  35.                 }
  36.                 repeat=0;
  37.                 task=&client_q.pool[i];
  38.                 client_q.pool[i].fd=s;
  39.                 task->conn.Socket=s;
  40.                 task->conn.timeout=120;
  41.                 task->status=-1;
  42.                 task->conn.only_do=(int (*)())conn_init;
  43.                 ret=do_epoll(task,EPOLL_CTL_ADD);//任务交到epoll队列。
  44.         }
复制代码

作者: dreamyseason    时间: 2011-08-28 10:00
本帖最后由 dreamyseason 于 2011-08-28 10:02 编辑

主线程压入连接任务,工作线程取出任务同时处理网络事件,难道没有压入非连接任务的线程?这样的话相当于没有普通(非连接请求)数据包的缓冲,任务队列的作用大打折扣。
作者: wwjxjtu    时间: 2011-08-28 14:28

作者: yulihua49    时间: 2011-08-29 13:58
主线程压入连接任务,工作线程取出任务同时处理网络事件,难道没有压入非连接任务的线程?这样的话相当于没 ...
dreamyseason 发表于 2011-08-28 10:00



    没听明白
作者: dreamyseason    时间: 2011-08-29 16:46
从你提供的代码来看,只向任务队列里压入新连接的任务,而没有压入其它监听到的事件,比如收到一个业务数据包。
作者: yulihua49    时间: 2011-08-30 16:28
本帖最后由 yulihua49 于 2011-08-30 16:53 编辑
从你提供的代码来看,只向任务队列里压入新连接的任务,而没有压入其它监听到的事件,比如收到一个业务数据 ...
dreamyseason 发表于 2011-08-29 16:46



   哦,压入了。在do_work()里,没贴出来。
它是在完成或夭折一个任务后,判决,如何ctl,EPOLL_CTL_MOD,EPOLL_CTL_DEL,也许会弄另一个相关的fd。这完全取决于处理逻辑,没有通用性,所以没贴出来。

我前边两段是通用的,你自己稍改一下就行。do_work()你自己写,干啥都行,最后得给epoll一个交待。

我这里是这样的:
  1. //工作线程 LT方式
  2. static int do_work(TCB *task)
  3. {
  4. int ret,cc;
  5. T_Connect *conn;
  6. void (*init)(T_Connect *,T_NetHead *);
  7. T_SRV_Var *ctx=&task->sv;
  8. 。。。。。。。。。。。。。。。。。。
  9. //            ret=do_some_thing(,,,);
  10.           switch(ret) {
  11.                 case -1:
  12.                         cc=do_epoll(task,EPOLL_CTL_DEL);
  13.                         client_del(task);
  14.                         ShowLog(2,"%s: disconnect by server",__FUNCTION__);
  15.                 case -5://epoll人家都处理好了,不用你管。
  16.                         break;
  17.                 default:
  18.                         cc=do_epoll(task,EPOLL_CTL_MOD);
  19.                         if(cc && errno != EEXIST) {
  20.                                 ShowLog(1,"%s:cancel by server",__FUNCTION__);
  21.                                 client_del(task);
  22.                         }
  23.                         break;
  24.                 }
  25.                 mthr_showid_del(ctx->tid);
  26.                 return 0;
  27. }
复制代码
都是成熟的,非常可靠的框架啦,应用中的。
作者: dreamyseason    时间: 2011-08-31 12:45
epoll一定加了EPOLLONESHOT标志了吧,否则处理任务的线程就会冲突了。
      另外可以考虑用一个线程监听并压入任务队列,另外几个工作线程直接处理对时序没有要求的客户请求而不压入队列,如果这样的操作很多的话应该是合算的。
作者: yulihua49    时间: 2011-08-31 13:58
epoll一定加了EPOLLONESHOT标志了吧,否则处理任务的线程就会冲突了。
     
dreamyseason 发表于 2011-08-31 12:45



    完全正确!do_epoll()里干的。
作者: yulihua49    时间: 2011-08-31 14:03
本帖最后由 yulihua49 于 2011-08-31 14:06 编辑
另外可以考虑用一个线程监听并压入任务队列,dreamyseason 发表于 2011-08-31 12:45



    原来是。但性能低,中间要切换一次线程。后来改的现在这个样子,程序简单了,效率还提高了不少。
一个人发任务,不如大家抢任务,谁抢到是谁的。
作者: dreamyseason    时间: 2011-09-01 10:13
又想到一个问题,貌似直到压队列后都没有执行recv读取操作,数据积压在内核缓冲里,网络繁忙时可能造成客户端发送失败,如果将把取的数据压入队列对客户端更友好些,读取只是一次内存拷贝(好像epoll无法关闭内核缓冲,因为在wait前无法向内核提供自己的缓冲区)应该很快。
作者: yulihua49    时间: 2011-09-01 11:36
本帖最后由 yulihua49 于 2011-09-01 11:41 编辑
又想到一个问题,貌似直到压队列后都没有执行recv读取操作,数据积压在内核缓冲里,网络繁忙时可能造成客户 ...
dreamyseason 发表于 2011-09-01 10:13



    在do_work()里处理,读或写。
所有的所有,都在do_work()里处理。
所有的数据,fd,events,callback,status。。。。。都在task里。其内容你可以自由定义。
task与events的关联,由do_epoll()处理。
作者: 12f3210    时间: 2012-05-15 17:27
回复 23# yulihua49


    你好,我目前的网络库,感觉效率还是不算很高,对于accept的处理,就如前面的朋友所言,单独一个线程,然后压入队列。多线程去获取。然后加入到自己线程内的epoll fd。

每个线程循环处理accept压入队列,epoll_wait。每个线程管理自己的fds。相互独立。

请问能把你的代码发送一份给我么?我想作为学习和参考,谢谢。

12f3210@163.com
作者: JohnBull    时间: 2012-05-15 21:42
系统调用都是线程安全的!
常识!

作者: chenzhanyiczy    时间: 2012-05-15 22:40
回复 25# JohnBull


    who say!

如何解析内核中有许多加锁操作?
作者: JohnBull    时间: 2012-05-15 22:43
chenzhanyiczy 发表于 2012-05-15 22:40
回复 25# JohnBull


对呀!就是因为内核加过锁了!有什么可解释的?
作者: JohnBull    时间: 2012-05-15 22:48
回复 26# chenzhanyiczy


而且说线程安全还是客气了的,应该说可重入
作者: chenzhanyiczy    时间: 2012-05-15 23:16
回复 28# JohnBull


    我理解你的意思了。单从调用的角度出发,是线程安全。但从结果的角度来看,就不是了线程安全。

    比如:A 线程write("aaa")  -> 1.文件 ,同时B线程也write("bbb") -> 1.文件

那么最终的结果有可能是: aaa

从这结果看,就不是线程安全了
作者: chenzhanyiczy    时间: 2012-05-15 23:19
回复 28# JohnBull


    可重入就更不对了
作者: JohnBull    时间: 2012-05-16 00:01
1+1在算错了的情况下等于3...

懒得解释了,爱咋想咋想去吧
作者: helpstudy    时间: 2013-05-13 00:12
很好,学习了!
作者: wonghoifung    时间: 2013-05-13 09:30
侬本多疑。。。
作者: jiangwt888    时间: 2014-12-09 16:03
不错的程序猴网站!!! 靓彩的贴子!!!不错的楼主!!!才华横溢的YILIHUA49!
dear Mr.YILIHUA49:
代码发给我学习下可以吗。这里的帖子讲的:http://bbs.chinaunix.net/thread-3568296-1-1.html
作者: jiangwt888    时间: 2014-12-09 16:03
我qq:9286999
作者: yulihua49    时间: 2014-12-10 10:47
本帖最后由 yulihua49 于 2014-12-10 10:51 编辑
jiangwt888 发表于 2014-12-09 16:03
不错的程序猴网站!!! 靓彩的贴子!!!不错的楼主!!!才华横溢的YILIHUA49!
dear Mr.YILIHUA49:
代 ...

就是这个贴啊,进SDBC群,下载源码。见scsrv/tpool.c,epoll封装在这里,线程池模型。
SDBC是一个交易中间件,支持多种连接模型,包括线程池模型。
线程池模型采用epoll,内置的L/F模型。




欢迎光临 Chinaunix (http://bbs.chinaunix.net/) Powered by Discuz! X3.2