免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
楼主: __BlueGuy_
打印 上一主题 下一主题

[C] epoll + 非阻塞I/O 怎么改进才能变成 多线程 + epoll + 非阻塞I/O [复制链接]

论坛徽章:
44
15-16赛季CBA联赛之浙江
日期:2021-10-11 02:03:59程序设计版块每日发帖之星
日期:2016-07-02 06:20:0015-16赛季CBA联赛之新疆
日期:2016-04-25 10:55:452016科比退役纪念章
日期:2016-04-23 00:51:2315-16赛季CBA联赛之山东
日期:2016-04-17 12:00:2815-16赛季CBA联赛之福建
日期:2016-04-12 15:21:2915-16赛季CBA联赛之辽宁
日期:2016-03-24 21:38:2715-16赛季CBA联赛之福建
日期:2016-03-18 12:13:4015-16赛季CBA联赛之佛山
日期:2016-02-05 00:55:2015-16赛季CBA联赛之佛山
日期:2016-02-04 21:11:3615-16赛季CBA联赛之天津
日期:2016-11-02 00:33:1215-16赛季CBA联赛之浙江
日期:2017-01-13 01:31:49
51 [报告]
发表于 2014-12-12 23:48 |只看该作者
回复 50# yulihua49

不都是框架回调么?为什么要调用框架?

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
52 [报告]
发表于 2014-12-13 20:51 |只看该作者
windoze 发表于 2014-12-12 23:48
回复 50# yulihua49

不都是框架回调么?为什么要调用框架?

正常是框架调用应用插件,但插件内也调用框架。
了解TUXEDO的可以知道。
我想异步化的,就是socket这一块。这是个公用程序,客户端、服务器都用,服务器还有PPC,TPC,TPOOL模式。
不是所有模式都支持这个机制,我需要一个兼容的方案。所以没办法用其他框架工具。

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
53 [报告]
发表于 2014-12-15 15:37 |只看该作者
windoze 发表于 2014-12-12 23:48
回复 50# yulihua49

不都是框架回调么?为什么要调用框架?

请问,每一个fiber都有自己的栈,是吧?
那么与TPC的区别,除了线程少一些,栈空间一点不少。

论坛徽章:
44
15-16赛季CBA联赛之浙江
日期:2021-10-11 02:03:59程序设计版块每日发帖之星
日期:2016-07-02 06:20:0015-16赛季CBA联赛之新疆
日期:2016-04-25 10:55:452016科比退役纪念章
日期:2016-04-23 00:51:2315-16赛季CBA联赛之山东
日期:2016-04-17 12:00:2815-16赛季CBA联赛之福建
日期:2016-04-12 15:21:2915-16赛季CBA联赛之辽宁
日期:2016-03-24 21:38:2715-16赛季CBA联赛之福建
日期:2016-03-18 12:13:4015-16赛季CBA联赛之佛山
日期:2016-02-05 00:55:2015-16赛季CBA联赛之佛山
日期:2016-02-04 21:11:3615-16赛季CBA联赛之天津
日期:2016-11-02 00:33:1215-16赛季CBA联赛之浙江
日期:2017-01-13 01:31:49
54 [报告]
发表于 2014-12-15 15:53 |只看该作者
回复 53# yulihua49

没错,fiber本来就省不下这些内存。
就我的观点这个其实也不能算是缺点,不管你用什么模型,每个客户session总是要存在什么地方的,不放在stack里也要想办法搞个context之类的东西把它存起来,这样做也许能省点内存,因为fiber的stack至少有一个page,大多数系统上是4k,实际的context也许没有这么大,但负面影响也是有的,你访问context内容的时候免不了要经过一个指针间接一下,而放在stack里就不用了,速度会比较快。
省stack内存的方案也是有的,gcc/clang支持split stack,可以按需分配stack内存,这样可以省不少地方,代价是性能会有点损失。

论坛徽章:
44
15-16赛季CBA联赛之浙江
日期:2021-10-11 02:03:59程序设计版块每日发帖之星
日期:2016-07-02 06:20:0015-16赛季CBA联赛之新疆
日期:2016-04-25 10:55:452016科比退役纪念章
日期:2016-04-23 00:51:2315-16赛季CBA联赛之山东
日期:2016-04-17 12:00:2815-16赛季CBA联赛之福建
日期:2016-04-12 15:21:2915-16赛季CBA联赛之辽宁
日期:2016-03-24 21:38:2715-16赛季CBA联赛之福建
日期:2016-03-18 12:13:4015-16赛季CBA联赛之佛山
日期:2016-02-05 00:55:2015-16赛季CBA联赛之佛山
日期:2016-02-04 21:11:3615-16赛季CBA联赛之天津
日期:2016-11-02 00:33:1215-16赛季CBA联赛之浙江
日期:2017-01-13 01:31:49
55 [报告]
发表于 2014-12-15 16:09 |只看该作者
回复 52# yulihua49

fiber可以随意创建thread(所谓的foreign thread),foreign thread也可以随意创建fibio::scheduler并在其中运行一组fiber;
foreign thread可以用fibio::condition_variable通知fiber,fiber也可以用std::condition_variable/pthread_cond通知foreign thread。
fiberized.io中的*_mutex和condition_variable都可以在任意context里直接创建,不需要在fiber中创建(但需要在fiber中操作,除了cv的notify_one/notify_all),你可以直接定义全局变量;
fibio::concurrent_queue可以用来从外界向fiber中传数据,basic_concurrent_queue<T, std::mutex, std::condition_variable>可以用来从fiber向外界传数据。

特意这么设计的,这样fiberized.io会比较容易和其它库/框架合作。

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
56 [报告]
发表于 2014-12-15 16:28 |只看该作者
本帖最后由 yulihua49 于 2014-12-15 16:51 编辑
windoze 发表于 2014-12-15 15:53
回复 53# yulihua49

没错,fiber本来就省不下这些内存。

我的设想如下:
只有IO完不成的时候才产生fiber,完成后销毁,SendNet是透明的,异步不异步看环境,就是能找到ucontext就做异步。
  1. int SendNet(int socket,char *buf,int n,int MTU,int TCB_no)
  2. {
  3. int bcount,br;
  4. int sz,i=0;
  5. int fflag;
  6. ucontext_t *uc=get_uc(TCB_no);
  7. unsigned long begin_stack,save_stack_size=0;

  8.         if(socket<0) return SYSERR;
  9.         fflag=fcntl(socket,F_GETFL,0);
  10.         if(uc) {
  11. //计算所需的栈帧
  12.                 swapcontext(uc,uc);
  13. #if __WORDSIZE == 64
  14.                 begin_stack=uc->uc_link->uc_mcontext.gregs[REG_RSP];
  15.                 save_stack_size=(uc->uc_mcontext.gregs[REG_RSP]-begin_stack);
  16. #else
  17.                 if(uc->uc_mcontext.gregs[REG_SS] == uc->uc_link->uc_mcontext.gregs[REG_SS]) {
  18.                   begin_stack=uc->uc_link->uc_mcontext.gregs[REG_ESP];
  19.                   save_stack_size=(uc->uc_mcontext.gregs[REG_ESP]-begin_stack);
  20.                 }
  21. #endif
  22.                 if(save_stack_size > 0X200000) save_stack_size=0; //>2M,
  23.                 if(save_stack_size==0) {        //如果需要保存的栈帧太大,就不要异步了
  24.                         uc=NULL;
  25.                 } else {
  26.                         fcntl(socket,F_SETFL,fflag|O_NONBLOCK); //异步操作
  27.                 }
  28.         }
  29.         bcount=0;
  30.         br=0;
  31.         if(MTU>500) SendSize=MTU;
  32.         else SendSize=n;
  33.         while(bcount<n){
  34.                 sz=MIN(n-bcount,SendSize);
  35.                 if((br=write(socket,buf,sz))>0){
  36.                         bcount+=br;
  37.                         buf+=br;
  38.                         continue;
  39.                 }
  40.                 if(br<0) break;
  41.                 if(bcount < n && uc) { //切换任务
  42.                     if(!uc->uc_stack.ss_size) {
  43. //创建fiber
  44.                         uc->uc_stack.ss_size=save_stack_size+256;
  45.                         uc->uc_stack.ss_sp=malloc(uc->uc_stack.ss_size);
  46. //保存线程栈帧   memcpy(uc->uc_stack.ss_sp-save_stack_size,begin_stack-save_stack_size,save_stack_size);
  47.                         swapcontext(uc,uc);
  48. //将REG_RSP,REG_RBP调整到新位置
  49. //将实际的rsp,rbp也调过来  这需要一段asm
  50. //  ......
  51.                     }
  52.                     i=do_event(socket,1); //do_epoll EPOLLOUT
  53.                     swapcontext(uc,uc->uc_link); // thread escape
  54.                 }
  55.         }
  56.         if(uc) {
  57.                 if(uc->uc_stack.ss_sp) {// 销毁fiber
  58. //恢复线程栈帧 需要一点ASM
  59. //到新的uc->uc_link.uc_mcontext.gregs[REG_RSP];
  60. //  ......
  61.                         free(uc->uc_stack.ss_sp);
  62.                         uc->uc_stack.ss_size=0;
  63.                 }
  64.                 fcntl(socket,F_SETFL,fflag);
  65.         }
  66.         return bcount==0?-1:bcount;
  67. }
复制代码
48,49行可能是不需要的。uc->uc_link是本线程返回点的context。
54行本线程完成了退栈,转其他任务。再回来时使用的fiber栈。
你提些看法。

论坛徽章:
44
15-16赛季CBA联赛之浙江
日期:2021-10-11 02:03:59程序设计版块每日发帖之星
日期:2016-07-02 06:20:0015-16赛季CBA联赛之新疆
日期:2016-04-25 10:55:452016科比退役纪念章
日期:2016-04-23 00:51:2315-16赛季CBA联赛之山东
日期:2016-04-17 12:00:2815-16赛季CBA联赛之福建
日期:2016-04-12 15:21:2915-16赛季CBA联赛之辽宁
日期:2016-03-24 21:38:2715-16赛季CBA联赛之福建
日期:2016-03-18 12:13:4015-16赛季CBA联赛之佛山
日期:2016-02-05 00:55:2015-16赛季CBA联赛之佛山
日期:2016-02-04 21:11:3615-16赛季CBA联赛之天津
日期:2016-11-02 00:33:1215-16赛季CBA联赛之浙江
日期:2017-01-13 01:31:49
57 [报告]
发表于 2014-12-15 17:11 |只看该作者
本帖最后由 windoze 于 2014-12-15 17:14 编辑

回复 56# yulihua49

大致看了下,我还是觉得这么搞太麻烦了,一个SendNet随着调用点不同,语义也会发生变化,调用这个函数的程序怎么适应这种变化呢?

另外对你这段程序有个小问题,你根据条件动态创建了一个新context,这个新context要执行什么东西呢?新context什么时候切回当前的context?看起来你这段程序只是在做AIO,没法加入其它功能啊?

实际情况下绝大多数程序里stack都不会太大的,一般context里占内存的东西都是在heap里,stack上只有个小结构体而已,fiberized.io里default stack size只有几k,如果你打开了segmented stack就只有一个page(确切说是2个,但有一个是guard page,不占物理内存),实在犯不上折腾。
当然,如果你是在搞VxWorks之类的嵌入式,就当我没说…………

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
58 [报告]
发表于 2014-12-15 20:05 |只看该作者
本帖最后由 yulihua49 于 2014-12-15 20:37 编辑
windoze 发表于 2014-12-15 17:11
回复 56# yulihua49

大致看了下,我还是觉得这么搞太麻烦了,一个SendNet随着调用点不同,语义也会发生 ...

新的conext就是为了这个fd能够回来,在epoll_wait之后。那时可能是另一个线程了。
这只是一个底层的功能。其他功能都在主调程序处理。
调用时,如果是线程池,TCB为正整数,可以取出uc。否则,TCB <0,取uc==NULL,就不进行异步IO。

53行把fd加到epoll。
54行回到主调程序。回去后线程退栈,忘掉先前的do_work那些事,守护epoll_wait。许多线程没事都在此。epoll有事返回,取出TCB,如果该TCB没有fiber,就调用do_work。否则setcontext()该TCB的context,它就回到55行。
实际的应用逻辑是do_work干的,do_work可能会调用SendNet来发信息,它视SedNet为同步阻塞调用。
IO完成后恢复原线程栈到新的线程,然后返回原do_work。

论坛徽章:
44
15-16赛季CBA联赛之浙江
日期:2021-10-11 02:03:59程序设计版块每日发帖之星
日期:2016-07-02 06:20:0015-16赛季CBA联赛之新疆
日期:2016-04-25 10:55:452016科比退役纪念章
日期:2016-04-23 00:51:2315-16赛季CBA联赛之山东
日期:2016-04-17 12:00:2815-16赛季CBA联赛之福建
日期:2016-04-12 15:21:2915-16赛季CBA联赛之辽宁
日期:2016-03-24 21:38:2715-16赛季CBA联赛之福建
日期:2016-03-18 12:13:4015-16赛季CBA联赛之佛山
日期:2016-02-05 00:55:2015-16赛季CBA联赛之佛山
日期:2016-02-04 21:11:3615-16赛季CBA联赛之天津
日期:2016-11-02 00:33:1215-16赛季CBA联赛之浙江
日期:2017-01-13 01:31:49
59 [报告]
发表于 2014-12-16 10:46 |只看该作者
回复 58# yulihua49

这么做的话是不是需要一个全局的TCB表,每个连接/session/task一个TCB?如果你有多个连接/session/task,这个表是不是需要动态增长?是不是在里面还要加个读写锁?
PS. 没找到do_work,你是说do_event吗?

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
60 [报告]
发表于 2014-12-16 13:56 |只看该作者
本帖最后由 yulihua49 于 2014-12-16 14:09 编辑
windoze 发表于 2014-12-16 10:46
回复 58# yulihua49

这么做的话是不是需要一个全局的TCB表,每个连接/session/task一个TCB?如果你有多 ...

有一个全局的TCB表,根据配置文件生成一个数组,数千到数万的规模吧。
所有TCB活动时使用线程栈。
主调函数如下:
  1. static void *thread_work(void *param)
  2. {
  3. resource *rs=(resource *)param;
  4. int ret,fds;
  5. TCB *task;
  6. struct epoll_event event;

  7.         rs->tid=pthread_self();
  8.         while(1) {
  9.         int timeout=0;
  10.                 if(!rs->tc.uc_stack.ss_size) {
  11.                         rs->tc.uc_stack.ss_sp=malloc(4096);
  12.                         rs->tc.uc_stack.ss_size=4096;
  13.                         swapcontext(&rs->tc,&rs->tc); //==setjmp();
  14.                 } else mthr_showid_del(rs->tid);
  15. //从就绪队列取一个任务
  16.                 pthread_mutex_lock(&rpool.mut);
  17.                 while(!(task=rdy_get())) {
  18.                         if(rpool.flg >= tpool.rdy_num) break;
  19.                         rpool.flg++;
  20.                         ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待
  21.                         rpool.flg--;
  22.                 }
  23.                 pthread_mutex_unlock(&rpool.mut);
  24.                 if(!task) {
  25.                         fds = epoll_wait(g_epoll_fd, &event, 1 , -1);
  26.                         if(fds < 0){
  27.                                 ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno));
  28.                                 sleep(30);
  29.                                 continue;
  30.                         }
  31.                         task = (TCB *)event.data.ptr;
  32.                         timeout=task->timeout;
  33.                         task->timeout=0;
  34.                         if(task->events == 0X10||task->fd==-1) {
  35.                                 ShowLog(1,"%s:task already timeout",__FUNCTION__);
  36.                                 continue;//已经进入超时状态
  37.                         }
  38.                         task->events=event.events;
  39.                 }
  40.                 rs->timestamp=now_usec();

  41.                 task->uc.uc_link=&rs->tc;
  42.                 if(task->uc.uc_stack.ss_size) {//fiber task
  43.                         set_showid(task->ctx);//Showid 应该在会话上下文结构里
  44.                         setcontext(&task->uc);  //== longjmp()
  45.                         continue;//no action,logic only
  46.                 }
  47.                 ret=do_work(task); //进行你的服务
  48.                 task->timestamp=now_usec();//task与rs的差就是应用任务执行时间
  49.                 switch(ret) {
  50.                 case -1:
  51.                         do_epoll(task,EPOLL_CTL_DEL,0);
  52.                         client_del(task);
  53.                 case THREAD_ESCAPE:
  54.                         break;
  55.                 default:
  56.                         if(!task->timeout) task->timeout=timeout; //timeout 没有被应用设置过
  57.                         if(do_epoll(task,EPOLL_CTL_MOD,0) && errno != EEXIST) {
  58.                                 ShowLog(1,"%s:cancel by server",__FUNCTION__);
  59.                                 client_del(task);
  60.                         }
  61.                         break;
  62.                 }
  63.                 mthr_showid_del(rs->tid);
  64.         }
  65.         ShowLog(1,"%s:tid=%lX canceled",__FUNCTION__,pthread_self());
  66.         mthr_showid_del(rs->tid);
  67.         rs->timestamp=now_usec();
  68.         rs->status=0;
  69.         rs->tid=0;
  70.         if(rs->tc.uc_stack.ss_sp) {
  71.                 free(rs->tc.uc_stack.ss_sp);
  72.                 rs->tc.uc_stack.ss_sp=NULL;
  73.                 rs->tc.uc_stack.ss_size=0;
  74.         }
  75.         return NULL;
  76. }
复制代码
15行定义了返回点,46行发送到fiber。
49行do_work,貌似就是业务逻辑了。实际上还是框架逻辑,由它根据客户端请求或事件状态再调用应用函数或应用rollback。
在do_work和应用中,都可能调用SendNet,RecvNet,这两个函数需要fiberized。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP