免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
楼主: codechurch

[C] 寻求建议,关于异步编程 [复制链接]

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
发表于 2014-12-17 15:32 |显示全部楼层
本帖最后由 yulihua49 于 2014-12-17 15:40 编辑
action08 发表于 2014-12-17 15:18
你好,这些接口都是自己手动包装实现的吗???

不是。
用ucontext,自己补了两个asm小函数(4楼),不知对不对。
调度程序和异步IO程序见4楼给你的链接。
重发异步IO的程序:
  1. /**********************************************
  2. * @(#) TCP SERVER Tools                      *
  3. * to suport Fiberized.IO
  4. **********************************************/
  5. #include <sys/socket.h>
  6. #include <sys/time.h>
  7. #include <unistd.h>
  8. #include <fcntl.h>

  9. #include "reg.h"

  10. #if __WORDSIZE == 64
  11. #define MAX_STACK 0X200000
  12. #else
  13. #define MAX_STACK 0X100000
  14. #endif

  15. #ifndef MIN
  16. #define MIN(a,b) ((a)<(b))?(a):(b)
  17. #endif

  18. extern char *set_sp(char *new_stack);
  19. extern char *restore_sp(char *to,char *from,char *bp,unsigned long size);
  20. //timeout for second
  21. int RecvNet(int socket,char *buf,int n,int timeout,int TCB_no)
  22. {
  23. int bcount=0,br,ret;
  24. int i;
  25. int fflag;
  26. ucontext_t *tc=NULL,*uc=get_uc(TCB_no);
  27. unsigned long begin_stack,save_stack_size=0;

  28.         if(socket<0) return SYSERR;
  29.         if(!buf && n<0) return 0;
  30.         fflag=fcntl(socket,F_GETFL,0);
  31.         if(uc) {
  32.                 tc=uc->uc_link;
  33.                 fcntl(socket,F_SETFL,fflag|O_NONBLOCK); //异步操作
  34.         } else {
  35.                 struct timeval tmout;
  36.                 tmout.tv_sec=timeout;
  37.                 tmout.tv_usec=0;
  38.                 ret=setsockopt(socket,SOL_SOCKET,SO_RCVTIMEO,(char *)&tmout,sizeof(tmout));
  39.                 if(ret) {
  40.                         ShowLog(5,"%s:setsockopt err=%d,%s",__FUNCTION__,
  41.                                 errno,strerror(errno));
  42.                 }
  43.         }

  44.         *buf=0;
  45.         br=0;

  46.         while(bcount<n){
  47.                 if((br=read(socket,buf,n-bcount))>0){
  48.                         bcount+=br;
  49.                         buf+=br;
  50.                         continue;
  51.                 }
  52.                 if(errno==EAGAIN) return TIMEOUTERR;
  53.                 if(br<0){
  54.                     if(errno!=ECONNRESET)
  55.                         ShowLog(1,"%s:br=%d,err=%d,%s",__FUNCTION__,br,errno,strerror(errno));
  56.                     break;
  57.                 }
  58. //ShowLog(5,"RecvNet:read br=0,errno=%d,%s",errno,strerror(errno));
  59.                 if(bcount < n && uc) { //切换任务
  60.                       if(!uc->uc_stack.ss_size) {
  61. //计算所需的栈帧
  62.                         swapcontext(uc,uc);
  63. #if __WORDSIZE == 64
  64.                         begin_stack=tc->uc_mcontext.gregs[REG_RSP];
  65.                         save_stack_size=uc->uc_mcontext.gregs[REG_RSP]-begin_stack;
  66. #else
  67.                         if(uc->uc_mcontext.gregs[REG_SS] == uc->uc_link->uc_mcontext.gregs[REG_SS]) {
  68.                                 begin_stack=tc->uc_mcontext.gregs[REG_ESP];
  69.                                 save_stack_size=uc->uc_mcontext.gregs[REG_ESP]-begin_stack;
  70.                         }
  71. #endif
  72.                         if(save_stack_size > MAX_STACK) save_stack_size=0;
  73.                         if(save_stack_size==0) {        //如果需要保存的栈帧太大,就不要异步了
  74.                                 uc=NULL;
  75.                                 fcntl(socket,F_SETFL,fflag);
  76.                                 continue;
  77.                         }
  78. //创建fiber
  79.                         uc->uc_stack.ss_size=save_stack_size+2048;
  80.                         uc->uc_stack.ss_sp=malloc(uc->uc_stack.ss_size);
  81.                         if(!uc->uc_stack.ss_sp) {
  82.                                 uc->uc_stack.ss_size=0;
  83.                                 uc=NULL;
  84.                                 fcntl(socket,F_SETFL,fflag);
  85.                                 continue;
  86.                         }
  87. //保存线程栈帧          memcpy(uc->uc_stack.ss_sp-save_stack_size,begin_stack-save_stack_size,save_stack_size);
  88. //将实际的rsp,rbp也调过来  这需要一段asm
  89.                         set_sp(uc->uc_stack.ss_sp-save_stack_size);
  90.                       }
  91.                         swapcontext(uc,uc);//在do_event之前,设定好栈
  92.                         if(tc != uc->uc_link) //do_event后被别线程抢入了
  93.                             continue;
  94.                         i=do_event(TCB_no,socket,0);//EPOOLIN
  95.                         swapcontext(uc,uc->uc_link); // thread escape
  96.                 }
  97.         }
  98.         if(uc) {
  99.                 if(uc->uc_stack.ss_sp) {// 销毁fiber
  100. //恢复线程栈帧 需要一点ASM
  101. //到新的uc->uc_link.uc_mcontext.gregs[REG_RSP];
  102. #if __WORDSIZE == 64
  103.                         memcpy(restore_sp(uc->uc_link->uc_mcontext.gregs[REG_RSP],
  104.                                         uc->uc_stack.ss_sp,uc->uc_link->uc_mcontext.gregs[REG_RBP],
  105.                                         save_stack_size),
  106.                                 uc->uc_stack.ss_sp-save_stack_size,save_stack_size);
  107. #else
  108.                         memcpy(restore_sp(uc->uc_link->uc_mcontext.gregs[REG_ESP],
  109.                                         uc->uc_stack.ss_sp,uc->uc_link->uc_mcontext.gregs[REG_EBP],
  110.                                         save_stack_size),
  111.                                 uc->uc_stack.ss_sp-save_stack_size,save_stack_size);
  112. #endif
  113. //需要恢复其他寄存器吗?
  114.                         free(uc->uc_stack.ss_sp);
  115.                         uc->uc_stack.ss_size=0;
  116.                 }
  117.                 fcntl(socket,F_SETFL,fflag);
  118.         }
  119.         return bcount==0?-1:bcount;
  120. }

  121. int SendNet(int socket,char *buf,int n,int MTU,int TCB_no)
  122. {
  123. int bcount,br;
  124. int sz,i=0;
  125. int fflag;
  126. size_t SendSize;
  127. ucontext_t *tc=NULL,*uc=get_uc(TCB_no);
  128. unsigned long begin_stack,save_stack_size=0;

  129.         if(socket<0) return SYSERR;
  130.         fflag=fcntl(socket,F_GETFL,0);
  131.         if(uc) {
  132.                 tc=uc->uc_link;
  133.                 fcntl(socket,F_SETFL,fflag|O_NONBLOCK); //异步操作
  134.         }
  135.         bcount=0;
  136.         br=0;
  137.         if(MTU>500) SendSize=MTU;
  138.         else SendSize=n;
  139.         while(bcount<n){
  140.                 sz=MIN(n-bcount,SendSize);
  141.                 if((br=write(socket,buf,sz))>0){
  142.                         bcount+=br;
  143.                         buf+=br;
  144.                         continue;
  145.                 }
  146.                 if(br<0) break;
  147.                 if(bcount < n && uc) { //切换任务
  148.                     if(!uc->uc_stack.ss_size) {
  149. //计算所需的栈帧
  150.                         swapcontext(uc,uc);
  151. #if __WORDSIZE == 64
  152.                         begin_stack=tc->uc_mcontext.gregs[REG_RSP];
  153.                         save_stack_size=uc->uc_mcontext.gregs[REG_RSP]-begin_stack;
  154. #else
  155.                         if(uc->uc_mcontext.gregs[REG_SS] == uc->uc_link->uc_mcontext.gregs[REG_SS]) {
  156.                                 begin_stack=tc->uc_mcontext.gregs[REG_ESP];
  157.                                 save_stack_size=uc->uc_mcontext.gregs[REG_ESP]-begin_stack;
  158.                         }
  159. #endif
  160.                         if(save_stack_size > MAX_STACK) save_stack_size=0;
  161.                         if(save_stack_size==0) {        //如果需要保存的栈帧太大,就不要异步了
  162.                                 uc=NULL;
  163.                                 fcntl(socket,F_SETFL,fflag);
  164.                                 continue;
  165.                         }
  166. //创建fiber
  167.                         uc->uc_stack.ss_size=save_stack_size+2048;
  168.                         uc->uc_stack.ss_sp=malloc(uc->uc_stack.ss_size);
  169. //保存线程栈帧          memcpy(uc->uc_stack.ss_sp-save_stack_size,begin_stack-save_stack_size,save_stack_size);
  170. //将实际的rsp,rbp也调过来  这需要一段asm
  171.                         set_sp(uc->uc_stack.ss_sp-save_stack_size);
  172.                     }
  173.                     swapcontext(uc,uc);//在do_event之前,设定好栈
  174.                     if(tc != uc->uc_link) //do_event后被别线程抢入了
  175.                         continue;
  176.                     i=do_event(TCB_no,socket,1); //do_epoll EPOLLOUT
  177.                     swapcontext(uc,tc); // thread escape
  178.                 }
  179.         }
  180.         if(uc) {
  181.                 if(uc->uc_stack.ss_sp) {// 销毁fiber
  182. //恢复线程栈帧 需要一点ASM
  183. //到新的uc->uc_link.uc_mcontext.gregs[REG_RSP];
  184. #if __WORDSIZE == 64
  185.                         memcpy(restore_sp(uc->uc_link->uc_mcontext.gregs[REG_RSP],
  186.                                         uc->uc_stack.ss_sp,uc->uc_link->uc_mcontext.gregs[REG_RBP],
  187.                                         save_stack_size),
  188.                                 uc->uc_stack.ss_sp-save_stack_size,save_stack_size);
  189. #else
  190.                         memcpy(restore_sp(uc->uc_link->uc_mcontext.gregs[REG_ESP],
  191.                                         uc->uc_stack.ss_sp,uc->uc_link->uc_mcontext.gregs[REG_EBP],
  192.                                         save_stack_size),
  193.                                 uc->uc_stack.ss_sp-save_stack_size,save_stack_size);
  194. #endif
  195. //需要恢复其他寄存器吗?
  196.                         free(uc->uc_stack.ss_sp);
  197.                         uc->uc_stack.ss_size=0;
  198.                 }
  199.                 fcntl(socket,F_SETFL,fflag);
  200.         }
  201.         return bcount==0?-1:bcount;
  202. }

  203.      
复制代码

论坛徽章:
0
发表于 2014-12-17 15:39 |显示全部楼层
回复 9# action08

是啊。

   

论坛徽章:
0
发表于 2014-12-17 15:43 |显示全部楼层
回复 6# windoze

如果是用 windows 的iocp 或者 linux aio 那种回调机制,我就在回调函数里直接执行了。此时,应该比用线程池快一点,虽然系统回调的线程,可能是在系统的线程池里。


   

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
发表于 2014-12-17 15:48 |显示全部楼层
本帖最后由 yulihua49 于 2014-12-17 15:50 编辑
codechurch 发表于 2014-12-17 15:39
回复 9# action08

是啊。


我的思路是:
基本中间件框架还是线程池模式,只在socket IO时异步,就是前边的两个函数。
只有在IO不能完成时生成fiber,释放线程。
完成后销毁fiber。
这样就不需要很多的fiber,很多的应用栈(栈的大小,你不知道留多少,尽可能大一些,很浪费的)。
我的方案,一是只有IO完不成才需要fiber。二是所需的栈大小是已知的,就是线程栈底,就是应用栈顶到当前rsp这一段内存。

论坛徽章:
44
15-16赛季CBA联赛之浙江
日期:2021-10-11 02:03:59程序设计版块每日发帖之星
日期:2016-07-02 06:20:0015-16赛季CBA联赛之新疆
日期:2016-04-25 10:55:452016科比退役纪念章
日期:2016-04-23 00:51:2315-16赛季CBA联赛之山东
日期:2016-04-17 12:00:2815-16赛季CBA联赛之福建
日期:2016-04-12 15:21:2915-16赛季CBA联赛之辽宁
日期:2016-03-24 21:38:2715-16赛季CBA联赛之福建
日期:2016-03-18 12:13:4015-16赛季CBA联赛之佛山
日期:2016-02-05 00:55:2015-16赛季CBA联赛之佛山
日期:2016-02-04 21:11:3615-16赛季CBA联赛之天津
日期:2016-11-02 00:33:1215-16赛季CBA联赛之浙江
日期:2017-01-13 01:31:49
发表于 2014-12-17 15:52 |显示全部楼层
回复 10# yulihua49

按我的理解大致描述一下你的程序流程,你看看对不对:

一个线程池做epoll_wait
有事件发生时,获得事件的thread得到fd,查找对应的TCB,不存在则创建一个
如果TCB里有一个挂起的context,则切换到这个context,
否则按照标准流程处理TCB,比如建立会话处理请求之类
在处理过程中,如果遇到阻塞操作,则为当前TCB保存一个现场context并挂起,然后该thread切换出去继续epoll_wait

看起来这个流程没什么问题,但有几点没搞清楚:
1、101行的do_event和下一行的swapcontext之间可能有race condition,因为你有一个thread pool在做epoll_wait,所以do_event把fd注册到epoll里之后,如果IO event立即/已经发生,这个fd可能立刻被其它thread获得,而这边的swapcontext可能还没来得及做,感觉可能会有问题。
2、如果这些TCB/session相互之间需要同步怎么办?如果你把一个获得某个锁的context挂起可能会导致其它thread阻塞甚至死锁
3、如果一个socket需要全双工操作怎么办?比如HTTP2的server,既要随时读client,又可能随时做server push,而这两个方向上的操作是完全异步的,按照你的设计TCB应该与连接一一对应,而好像又不可能给一个TCB建立两个context,所以你没办法在同一个连接上同时做两个阻塞操作。

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
发表于 2014-12-17 16:01 |显示全部楼层
本帖最后由 yulihua49 于 2014-12-17 16:24 编辑
windoze 发表于 2014-12-17 15:52
回复 10# yulihua49

按我的理解大致描述一下你的程序流程,你看看对不对:

TCB是一个预置的TCB池,其中之一被加入到epoll的event-》data中。
1.这就是11楼98-100行的作用。98行已经设置好context了,其他线程抢入时,直接进行io即可,尽管当前线程还未完全退出。
2.TCB之间,目前没有session,如果需要的话,可以用消息队列什么的,目前消息队列没有纳入异步。获取连接池的操作比较特殊,这就是主调程序里,那个rdy队列的作用。
3.目前我的交易中间件没有全双工操作。我还不知道epoll如何处理全双工操作。以SendNet为例,现状是,epoll使用ONESHOT,读的那个fd此时是不响应的,响应的这个fd一定是写的。

论坛徽章:
0
发表于 2014-12-17 16:08 |显示全部楼层
回复 14# yulihua49

我的想法是,所有将所有客户业务,每个业务就是一个fiber。

因为无论如何,机器上总是有  N 个实际线程 和 M 个业务。

如果N个在实际线程的业务没有fiber,其实也不会优化多少,因为 N 是小值, M远大于N。而且,反而会造成 fiber create和destroy的性能损耗。


   

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
发表于 2014-12-17 16:10 |显示全部楼层
本帖最后由 yulihua49 于 2014-12-17 16:14 编辑

回复 17# codechurch
fiber栈不是在M里的吗?那会很多的啊。
我的方案,就是所有TCB(M个),不含栈,用线程栈(N个)。
创建、销毁fiber,就是malloc/free + 两个memcpy。


   

论坛徽章:
0
发表于 2014-12-17 16:13 |显示全部楼层
回复 18# yulihua49

我计算过了:

    栈在windows上最小是16K。

    128K 个fiber (130000客户) 占内存 2G。

   


我认为在x64上,这不是问题。

而且在x86上可以多进程负载一下。



   

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
发表于 2014-12-17 16:16 |显示全部楼层
本帖最后由 yulihua49 于 2014-12-17 16:40 编辑
codechurch 发表于 2014-12-17 16:13
回复 18# yulihua49

我计算过了:

你不知道应用,它怎么用内存,递归什么的。所以线程栈,系统缺省值,32位是1M,64位是2M。
我要是做fiber,可能也要照这个数留,或者配置文件定一下。

当然现在大内存也没有什么太严重的经济问题。

我之所以用线程池模式,一个是避免大量线程,另一个是避免大量耗内存。这样,我可以通过调整线程栈尺寸,来适应大内存需求的应用。
我见过的,quicklz,我用的压缩器,栈需求是530K。
我自己的,创建hash表,栈需求与“桶”一样大。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP