免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
楼主: waruqi
打印 上一主题 下一主题

[C] tbox新增stackless协程支持 [复制链接]

论坛徽章:
1
程序设计版块每日发帖之星
日期:2015-11-08 06:20:00
11 [报告]
发表于 2016-12-20 11:32 |只看该作者
回复 8# yulihua49
我不会去用lockless机制构建生产者/消费者模型
而是通过channel或者协程lock,或者协程版本的信号量来处理,得不到资源,这个协程会被挂起,除非unlock(内部会resume协程),否者这个协程永远不会被调度到
我内部实现协程的 suspend/resume和yield是有区别的,等待lock的suspend模式,会把这个协程从调度链里面移除掉,不再被调度,除非unlock(resume),所以不存在cpu问题

而yield是让出当前协程,但是之后还是会被调度到,跟suspend还是有区别的,因此我在实现协程锁的时候,内部并没有用yield,而是直接suspend了。

论坛徽章:
2
综合交流区版块每日发帖之星
日期:2016-07-06 06:20:00综合交流区版块每日发帖之星
日期:2016-08-16 06:20:00
12 [报告]
发表于 2016-12-20 13:11 |只看该作者
楼主加油,我们都看好你哦

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
13 [报告]
发表于 2016-12-20 20:04 |只看该作者
本帖最后由 yulihua49 于 2016-12-20 20:12 编辑
waruqi 发表于 2016-12-20 11:25
而且我并没说必须要使用stackless协程,我的库里面stackfull和stackless都实现了,根据自己的实际需求,可 ...

这个多线程就好,何必协程?
sleep和挂起锁倒是可以考虑采用协程。
可以考虑实现:
coroutine_mutex_lock
coroutine_mutex_unlock
coroutine_cond_wait
coroutine_cond_signal
coroutine_sleep


论坛徽章:
1
程序设计版块每日发帖之星
日期:2015-11-08 06:20:00
14 [报告]
发表于 2016-12-21 20:31 |只看该作者
回复 13# yulihua49

多线程同步还得加lock同步,我还是比较喜欢用协程简单写写就好了,一个线程就搞定了,没必要去浪费线程资源。。
下面的那些接口,我都有实现。。

论坛徽章:
9
程序设计版块每日发帖之星
日期:2015-10-18 06:20:00程序设计版块每日发帖之星
日期:2015-11-01 06:20:00程序设计版块每日发帖之星
日期:2015-11-02 06:20:00每日论坛发贴之星
日期:2015-11-02 06:20:00程序设计版块每日发帖之星
日期:2015-11-03 06:20:00程序设计版块每日发帖之星
日期:2015-11-04 06:20:00程序设计版块每日发帖之星
日期:2015-11-06 06:20:00数据库技术版块每周发帖之星
日期:2015-12-02 15:02:47数据库技术版块每日发帖之星
日期:2015-12-08 06:20:00
15 [报告]
发表于 2016-12-24 01:17 |只看该作者
不错,支持,加油

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
16 [报告]
发表于 2017-01-21 15:59 |只看该作者
waruqi 发表于 2016-12-07 22:46
tbox之前提供的stackfull协程库,虽然切换效率已经非常高了,但是由于每个协程都需要维护一个独立的堆栈,  ...

tbox跟tboox是一回事吗?

论坛徽章:
11
未羊
日期:2013-12-16 12:45:4615-16赛季CBA联赛之青岛
日期:2016-04-11 19:17:4715-16赛季CBA联赛之广夏
日期:2016-04-06 16:34:012015亚冠之卡尔希纳萨夫
日期:2015-11-10 10:04:522015亚冠之大阪钢巴
日期:2015-07-30 18:29:402015亚冠之城南
日期:2015-06-15 17:56:392015亚冠之卡尔希纳萨夫
日期:2015-05-15 15:19:272015亚冠之山东鲁能
日期:2015-05-14 12:38:13金牛座
日期:2014-12-04 15:34:06子鼠
日期:2014-10-16 13:40:4715-16赛季CBA联赛之八一
日期:2016-07-22 09:41:40
17 [报告]
发表于 2017-01-23 16:18 |只看该作者
本帖最后由 zylthinking 于 2017-01-23 17:38 编辑
yulihua49 发表于 2016-12-09 10:36
以前跟猫讨论过这个问题。stackless对使用的限制不是一般的大,你的应用不可能不使用第三方软件。人家如何 ...

你这个是怎么一个用法啊

corotine(char* addr) {
   int ip = addr;
    char buf[];
    while(1) {
        int n = readfrom(*buf, ip);
        char c = buf[9];
        int n2 = readfrom(ip);

        write(buf, ip);
    }
}

比如这样的协程, 切换发生在 readfrom 和 write 内部; 你如何做到 只给活动协程分配栈 的同时还能保存 ip, n, n2, buf  这些数据?
注意它还是个 while(1);  意思是 ip, buf 应该始终可访问;
你的是类似任务池似得吧, 本质是将缓冲, 各种寄存器保存到任务上, 处理这个任务的协程从任务本身加载寄存器,
类似这种:

thread(char* addr) {
    while(1) {
         fd = epoll_wait();        
         if (null == (task = task_get(fd))) {
            task = new task();
            task.ip = proc;
            task.sp = malloc();
            bind_task_to(fd, task);
            corutine_add_run_list(task);
         } else {
            corutine_resume(task);
        }
    }
}

one theead in corotine threads pool()
{
    while(1) {
        mysp = sp;
        myip = ip;

        select one task from runlist;
        load register from task
        jmp to task.ip
    }
}


proc(task) {        
        char buf[];

        int n = readfrom(*buf, ip); // yield when not ready
        char c = buf[9];
        write(c, ip);  // yield when not ready

        bind_task_to(fd, null);        
        free(task);
        add_to_epoll_noblock(fd);
        sp = mysp;
        jmp myip;
}

但这个其实并没有简化编程;  
如果硬要给他一个意义,
只能假设这里 task 是一个动作, 而是为了达到这个动作需要进行多次协议交换, 因此相关协议在一个函数内顺序处理, 比普通 epoll 所有协议平坦处理看上去要容易理解;
但这个 task 也不能有一个 session 级别的生命期; 否则, 就是每个连接一个 task, 每个  task 一个私有栈,  那也就是标准的 taskfull, 也就达不到所谓节省内存的目的了

如果 task 只代表一个协议呢, 那其实和不用协程的带任务队列的 epoll 没区别,  至少, 不用花多大力气, 就能做到同样效果

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
18 [报告]
发表于 2017-01-23 20:13 |只看该作者
本帖最后由 yulihua49 于 2017-01-23 20:21 编辑
zylthinking 发表于 2017-01-23 16:18
你这个是怎么一个用法啊

corotine(char* addr) {

在epoll_wait()之后为task分配栈,之后由makecontext()配置工作函数入口,由swapcontext转移到工作函数。所有的收发都在工作函数里。
实际的过程比说的复杂的多,当然是不省事,而且调试极为困难,搞了好几个月。但是这部分是框架,只弄一次,以后你所有的do_work()都会非常省事。

  1. static void *thread_work(void *param)
  2. {
  3. resource *rs=(resource *)param;
  4. int ret,fds;
  5. TCB *task=NULL;
  6. struct epoll_event event;

  7.         ShowLog(2,"%s:thread %lx start!",__FUNCTION__,pthread_self());
  8.         getcontext(&rs->tc);
  9.         if(task)  pthread_mutex_unlock(&task->lock);

  10.         while(1) {
  11. //从就绪队列取一个任务
  12.                 pthread_mutex_lock(&rpool.mut);
  13.                 while(!(task=rdy_get())) {
  14.                         if(rpool.flg >= tpool.rdy_num) break;
  15.                         rpool.flg++;
  16.                         ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待
  17.                         rpool.flg--;
  18.                 }
  19.                 pthread_mutex_unlock(&rpool.mut);
  20.                 if(task) {
  21.                         if(!task->AIO_flg && !task->call_back) {
  22.                                 task->fd=task->conn.Socket;
  23.                                 ShowLog(5,"%s:tid=%lx,TCB_no=%d from rdy_queue",__FUNCTION__,
  24.                                         pthread_self(),task->sv.TCB_no);
  25.                                 if(task->fd>=0) {
  26.                                         do_epoll(task,0,0);
  27.                                 }
  28.                                 continue;
  29.                         }
  30.                 } else  {
  31.                         fds = epoll_wait(g_epoll_fd, &event, 1 , -1);
  32.                         if(fds < 0){
  33.                                         ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno));
  34.                                 usleep(30000000);
  35.                                 continue;
  36.                                 }
  37.                          task = (TCB *)event.data.ptr;
  38.                         if(task->events) {
  39.                             ShowLog(1,"%s:tid=%lx,TCB_no=%d,task->events=%08X,conflict!",__FUNCTION__,
  40.                                     pthread_self(),task->sv.TCB_no,task->events);//发现惊群
  41.                             task=NULL;
  42.                             continue;//丢掉它
  43.                         }
  44.                         task->events=event.events;
  45.                 }
  46.                 rs->timestamp=now_usec();
  47.                 if(task->status>0) set_showid(task->ctx);//Showid 应该在会话上下文结构里
  48.                
  49.                 if(task->AIO_flg) {//fiber task
  50.                     task->uc.uc_link=&rs->tc;
  51.                     rs->tc.uc_link=(ucontext_t *)task;
  52. ShowLog(5,"%s:tid=%lx,resume to TCB_no=%d",__FUNCTION__,pthread_self(),task->sv.TCB_no);
  53.                         pthread_mutex_lock(&task->lock);//防止其他线程提前闯入
  54.                         setcontext(&task->uc);        //== longjmp()
  55.                         continue;//no action,logic only
  56.                 }
  57.                 if(task->uc.uc_stack.ss_size>0) {//call_back模式,抢入了,进入同步模式
  58.             rs->tc.uc_link=NULL;
  59. ShowLog(5,"%s:tid %lx 抢入 SYNC",__FUNCTION__,pthread_self());
  60.                         do_work(task->sv.TCB_no);
  61.                         continue;
  62.                 }
  63.                 if(!rs->tc.uc_stack.ss_sp) {
  64. ShowLog(5,"%s:%lx create fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  65.                         task->uc.uc_stack.ss_sp=mmap(0, use_stack_size,
  66.                                 PROT_READ | PROT_WRITE | PROT_EXEC,
  67.                                 MAP_PRIVATE | MAP_ANON | MAP_GROWSDOWN, -1, 0);
  68.                         if(task->uc.uc_stack.ss_sp==MAP_FAILED) {
  69.                                 task->uc.uc_stack.ss_sp=NULL;
  70.                                 do_work(task->sv.TCB_no); //进行你的服务,不使用AIO
  71.                                 continue;
  72.                         }
  73.                 } else {
  74. //ShowLog(5,"%s:%lx reuse fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  75.                         task->uc.uc_stack.ss_sp=rs->tc.uc_stack.ss_sp;
  76.                         rs->tc.uc_stack.ss_sp=NULL;
  77.                         rs->tc.uc_stack.ss_size=0;
  78.                 }
  79.                 task->uc.uc_stack.ss_size=use_stack_size;
  80.                 task->uc.uc_link=&rs->tc;
  81.                 rs->tc.uc_link=(ucontext_t *)task;
  82.                 makecontext(&task->uc,(void (*)())do_work,1,task->sv.TCB_no);

  83.                 ret=swapcontext(&rs->tc,&task->uc);
  84.                 if(ret<0) {
  85.                         ShowLog(1,"%s:swapcontext fault TCB_NO=%d,tid=%lx,errno=%d,%s",
  86.                                 __FUNCTION__,task->sv.TCB_no,pthread_self(),ret,strerror(abs(ret)));
  87.                         rs->tc.uc_link=NULL;
  88.                         task->uc.uc_link=NULL;
  89.                         if(task->uc.uc_stack.ss_sp)
  90.                                 munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
  91.                         task->uc.uc_stack.ss_sp=NULL;
  92.                         task->uc.uc_stack.ss_size=0;
  93.                         do_work(task->sv.TCB_no);
  94.                         mthr_showid_del(rs->tid);
  95.                         continue;
  96.                 }
  97.                 if(!task) {
  98.                         ShowLog(1,"%s:aft swapcontext task is NULL",__FUNCTION__);
  99.                         continue;
  100.                 }
  101.                 if(!task->AIO_flg) {//service complate
  102.                         if(!rs->tc.uc_stack.ss_size) {//回收fiber stack
  103. //ShowLog(5,"%s:%lx release fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  104.                                 rs->tc.uc_stack.ss_sp=task->uc.uc_stack.ss_sp;
  105.                                 if(rs->tc.uc_stack.ss_sp)
  106.                                          rs->tc.uc_stack.ss_size=use_stack_size;
  107.                                 else rs->tc.uc_stack.ss_size=0;
  108.                         } else {
  109. ShowLog(5,"%s:%lx destroy fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  110.                                 if(task->uc.uc_stack.ss_sp)
  111.                                         munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
  112.                         }
  113.                         task->uc.uc_stack.ss_sp=NULL;
  114.                         rs->tc.uc_link=NULL;
  115.                         task->uc.uc_link=NULL;
  116.                         task->uc.uc_stack.ss_size=0;//mark fiber cpmplate
  117. //ShowLog(5,"%s:TCB_no=%d,tid=%lx,timeout=%d,conn.timeout=%d",__FUNCTION__,task->sv.TCB_no,rs->tid,task->timeout,task->conn.timeout);
  118.                 } else {
  119.                         pthread_mutex_unlock(&task->lock);

  120. ShowLog(5,"%s:tid=%lx,fiber yield from TCB_no=%d",
  121.                         __FUNCTION__,pthread_self(),task->sv.TCB_no);
  122.                 }
  123.                 mthr_showid_del(rs->tid);
  124.         }
  125.         ShowLog(1,"%s:tid=%lx canceled",__FUNCTION__,pthread_self());
  126.         mthr_showid_del(rs->tid);
  127.         rs->timestamp=now_usec();
  128.         rs->status=0;
  129.         rs->tid=0;
  130.         if(rs->tc.uc_stack.ss_sp) {
  131.                 munmap(rs->tc.uc_stack.ss_sp,rs->tc.uc_stack.ss_size);
  132.                 rs->tc.uc_stack.ss_sp=NULL;
  133.                 rs->tc.uc_stack.ss_size=0;
  134.         }
  135.         return NULL;
  136. }
复制代码


论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
19 [报告]
发表于 2017-01-23 20:23 |只看该作者
本帖最后由 yulihua49 于 2017-01-23 20:24 编辑

回复 17# zylthinking 怎么发了个回复没了?
  1. static void *thread_work(void *param)
  2. {
  3. resource *rs=(resource *)param;
  4. int ret,fds;
  5. TCB *task=NULL;
  6. struct epoll_event event;

  7.         ShowLog(2,"%s:thread %lx start!",__FUNCTION__,pthread_self());
  8.         getcontext(&rs->tc);
  9.         if(task)  pthread_mutex_unlock(&task->lock);

  10.         while(1) {
  11. //从就绪队列取一个任务
  12.                 pthread_mutex_lock(&rpool.mut);
  13.                 while(!(task=rdy_get())) {
  14.                         if(rpool.flg >= tpool.rdy_num) break;
  15.                         rpool.flg++;
  16.                         ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待
  17.                         rpool.flg--;
  18.                 }
  19.                 pthread_mutex_unlock(&rpool.mut);
  20.                 if(task) {
  21.                         if(!task->AIO_flg && !task->call_back) {
  22.                                 task->fd=task->conn.Socket;
  23.                                 ShowLog(5,"%s:tid=%lx,TCB_no=%d from rdy_queue",__FUNCTION__,
  24.                                         pthread_self(),task->sv.TCB_no);
  25.                                 if(task->fd>=0) {
  26.                                         do_epoll(task,0,0);
  27.                                 }
  28.                                 continue;
  29.                         }
  30.                 } else  {
  31.                         fds = epoll_wait(g_epoll_fd, &event, 1 , -1);
  32.                         if(fds < 0){
  33.                                         ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno));
  34.                                 usleep(30000000);
  35.                                 continue;
  36.                                 }
  37.                          task = (TCB *)event.data.ptr;
  38.                         if(task->events) {
  39.                             ShowLog(1,"%s:tid=%lx,TCB_no=%d,task->events=%08X,conflict!",__FUNCTION__,
  40.                                     pthread_self(),task->sv.TCB_no,task->events);//发现惊群
  41.                             task=NULL;
  42.                             continue;//丢掉它
  43.                         }
  44.                         task->events=event.events;
  45.                 }
  46.                 rs->timestamp=now_usec();
  47.                 if(task->status>0) set_showid(task->ctx);//Showid 应该在会话上下文结构里
  48.                
  49.                 if(task->AIO_flg) {//fiber task
  50.                     task->uc.uc_link=&rs->tc;
  51.                     rs->tc.uc_link=(ucontext_t *)task;
  52. ShowLog(5,"%s:tid=%lx,resume to TCB_no=%d",__FUNCTION__,pthread_self(),task->sv.TCB_no);
  53.                         pthread_mutex_lock(&task->lock);//防止其他线程提前闯入
  54.                         setcontext(&task->uc);        //== longjmp()
  55.                         continue;//no action,logic only
  56.                 }
  57.                 if(task->uc.uc_stack.ss_size>0) {//call_back模式,抢入了,进入同步模式
  58.             rs->tc.uc_link=NULL;
  59. ShowLog(5,"%s:tid %lx 抢入 SYNC",__FUNCTION__,pthread_self());
  60.                         do_work(task->sv.TCB_no);
  61.                         continue;
  62.                 }
  63.                 if(!rs->tc.uc_stack.ss_sp) {
  64. ShowLog(5,"%s:%lx create fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  65.                         task->uc.uc_stack.ss_sp=mmap(0, use_stack_size,
  66.                                 PROT_READ | PROT_WRITE | PROT_EXEC,
  67.                                 MAP_PRIVATE | MAP_ANON | MAP_GROWSDOWN, -1, 0);
  68.                         if(task->uc.uc_stack.ss_sp==MAP_FAILED) {
  69.                                 task->uc.uc_stack.ss_sp=NULL;
  70.                                 do_work(task->sv.TCB_no); //进行你的服务,不使用AIO
  71.                                 continue;
  72.                         }
  73.                 } else {
  74. //ShowLog(5,"%s:%lx reuse fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  75.                         task->uc.uc_stack.ss_sp=rs->tc.uc_stack.ss_sp;
  76.                         rs->tc.uc_stack.ss_sp=NULL;
  77.                         rs->tc.uc_stack.ss_size=0;
  78.                 }
  79.                 task->uc.uc_stack.ss_size=use_stack_size;
  80.                 task->uc.uc_link=&rs->tc;
  81.                 rs->tc.uc_link=(ucontext_t *)task;
  82.                 makecontext(&task->uc,(void (*)())do_work,1,task->sv.TCB_no);

  83.                 ret=swapcontext(&rs->tc,&task->uc);
  84.                 if(ret<0) {
  85.                         ShowLog(1,"%s:swapcontext fault TCB_NO=%d,tid=%lx,errno=%d,%s",
  86.                                 __FUNCTION__,task->sv.TCB_no,pthread_self(),ret,strerror(abs(ret)));
  87.                         rs->tc.uc_link=NULL;
  88.                         task->uc.uc_link=NULL;
  89.                         if(task->uc.uc_stack.ss_sp)
  90.                                 munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
  91.                         task->uc.uc_stack.ss_sp=NULL;
  92.                         task->uc.uc_stack.ss_size=0;
  93.                         do_work(task->sv.TCB_no);
  94.                         mthr_showid_del(rs->tid);
  95.                         continue;
  96.                 }
  97.                 if(!task) {
  98.                         ShowLog(1,"%s:aft swapcontext task is NULL",__FUNCTION__);
  99.                         continue;
  100.                 }
  101.                 if(!task->AIO_flg) {//service complate
  102.                         if(!rs->tc.uc_stack.ss_size) {//回收fiber stack
  103. //ShowLog(5,"%s:%lx release fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  104.                                 rs->tc.uc_stack.ss_sp=task->uc.uc_stack.ss_sp;
  105.                                 if(rs->tc.uc_stack.ss_sp)
  106.                                          rs->tc.uc_stack.ss_size=use_stack_size;
  107.                                 else rs->tc.uc_stack.ss_size=0;
  108.                         } else {
  109. ShowLog(5,"%s:%lx destroy fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  110.                                 if(task->uc.uc_stack.ss_sp)
  111.                                         munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
  112.                         }
  113.                         task->uc.uc_stack.ss_sp=NULL;
  114.                         rs->tc.uc_link=NULL;
  115.                         task->uc.uc_link=NULL;
  116.                         task->uc.uc_stack.ss_size=0;//mark fiber cpmplate
  117. //ShowLog(5,"%s:TCB_no=%d,tid=%lx,timeout=%d,conn.timeout=%d",__FUNCTION__,task->sv.TCB_no,rs->tid,task->timeout,task->conn.timeout);
  118.                 } else {
  119.                         pthread_mutex_unlock(&task->lock);

  120. ShowLog(5,"%s:tid=%lx,fiber yield from TCB_no=%d",
  121.                         __FUNCTION__,pthread_self(),task->sv.TCB_no);
  122.                 }
  123.                 mthr_showid_del(rs->tid);
  124.         }
  125.         ShowLog(1,"%s:tid=%lx canceled",__FUNCTION__,pthread_self());
  126.         mthr_showid_del(rs->tid);
  127.         rs->timestamp=now_usec();
  128.         rs->status=0;
  129.         rs->tid=0;
  130.         if(rs->tc.uc_stack.ss_sp) {
  131.                 munmap(rs->tc.uc_stack.ss_sp,rs->tc.uc_stack.ss_size);
  132.                 rs->tc.uc_stack.ss_sp=NULL;
  133.                 rs->tc.uc_stack.ss_size=0;
  134.         }
  135.         return NULL;
  136. }
复制代码





论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
20 [报告]
发表于 2017-01-23 20:25 |只看该作者
  1. static void *thread_work(void *param)
  2. {
  3. resource *rs=(resource *)param;
  4. int ret,fds;
  5. TCB *task=NULL;
  6. struct epoll_event event;

  7.         ShowLog(2,"%s:thread %lx start!",__FUNCTION__,pthread_self());
  8.         getcontext(&rs->tc);
  9.         if(task)  pthread_mutex_unlock(&task->lock);

  10.         while(1) {
  11. //从就绪队列取一个任务
  12.                 pthread_mutex_lock(&rpool.mut);
  13.                 while(!(task=rdy_get())) {
  14.                         if(rpool.flg >= tpool.rdy_num) break;
  15.                         rpool.flg++;
  16.                         ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待
  17.                          rpool.flg--;
  18.                 }
  19.                 pthread_mutex_unlock(&rpool.mut);
  20.                 if(task) {
  21.                         if(!task->AIO_flg && !task->call_back) {
  22.                                 task->fd=task->conn.Socket;
  23.                                 ShowLog(5,"%s:tid=%lx,TCB_no=%d from rdy_queue",__FUNCTION__,
  24.                                         pthread_self(),task->sv.TCB_no);
  25.                                 if(task->fd>=0) {
  26.                                         do_epoll(task,0,0);
  27.                                 }
  28.                                 continue;
  29.                         }
  30.                 } else  {
  31.                         fds = epoll_wait(g_epoll_fd, &event, 1 , -1);
  32.                         if(fds < 0){
  33.                                         ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno));
  34.                                 usleep(30000000);
  35.                                 continue;
  36.                                 }
  37.                          task = (TCB *)event.data.ptr;
  38.                         if(task->events) {
  39.                             ShowLog(1,"%s:tid=%lx,TCB_no=%d,task->events=%08X,conflict!",__FUNCTION__,
  40.                                     pthread_self(),task->sv.TCB_no,task->events);//发现惊群
  41.                             task=NULL;
  42.                             continue;//丢掉它
  43.                         }
  44.                         task->events=event.events;
  45.                 }
  46.                 rs->timestamp=now_usec();
  47.                 if(task->status>0) set_showid(task->ctx);//Showid 应该在会话上下文结构里
  48.                
  49.                 if(task->AIO_flg) {//fiber task
  50.                     task->uc.uc_link=&rs->tc;
  51.                     rs->tc.uc_link=(ucontext_t *)task;
  52. ShowLog(5,"%s:tid=%lx,resume to TCB_no=%d",__FUNCTION__,pthread_self(),task->sv.TCB_no);
  53.                         pthread_mutex_lock(&task->lock);//防止其他线程提前闯入
  54.                         setcontext(&task->uc);        //== longjmp()
  55.                         continue;//no action,logic only
  56.                 }
  57.                 if(task->uc.uc_stack.ss_size>0) {//call_back模式,抢入了,进入同步模式
  58.             rs->tc.uc_link=NULL;
  59. ShowLog(5,"%s:tid %lx 抢入 SYNC",__FUNCTION__,pthread_self());
  60.                         do_work(task->sv.TCB_no);
  61.                         continue;
  62.                 }
  63.                 if(!rs->tc.uc_stack.ss_sp) {
  64. ShowLog(5,"%s:%lx create fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  65.                         task->uc.uc_stack.ss_sp=mmap(0, use_stack_size,
  66.                                 PROT_READ | PROT_WRITE | PROT_EXEC,
  67.                                 MAP_PRIVATE | MAP_ANON | MAP_GROWSDOWN, -1, 0);
  68.                         if(task->uc.uc_stack.ss_sp==MAP_FAILED) {
  69.                                 task->uc.uc_stack.ss_sp=NULL;
  70.                                 do_work(task->sv.TCB_no); //进行你的服务,不使用AIO
  71.                                 continue;
  72.                         }
  73.                 } else {
  74. //ShowLog(5,"%s:%lx reuse fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  75.                         task->uc.uc_stack.ss_sp=rs->tc.uc_stack.ss_sp;
  76.                         rs->tc.uc_stack.ss_sp=NULL;
  77.                         rs->tc.uc_stack.ss_size=0;
  78.                 }
  79.                 task->uc.uc_stack.ss_size=use_stack_size;
  80.                 task->uc.uc_link=&rs->tc;
  81.                 rs->tc.uc_link=(ucontext_t *)task;
  82.                 makecontext(&task->uc,(void (*)())do_work,1,task->sv.TCB_no);

  83.                 ret=swapcontext(&rs->tc,&task->uc);
  84.                 if(ret<0) {
  85.                         ShowLog(1,"%s:swapcontext fault TCB_NO=%d,tid=%lx,errno=%d,%s",
  86.                                 __FUNCTION__,task->sv.TCB_no,pthread_self(),ret,strerror(abs(ret)));
  87.                         rs->tc.uc_link=NULL;
  88.                         task->uc.uc_link=NULL;
  89.                         if(task->uc.uc_stack.ss_sp)
  90.                                 munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
  91.                         task->uc.uc_stack.ss_sp=NULL;
  92.                         task->uc.uc_stack.ss_size=0;
  93.                         do_work(task->sv.TCB_no);
  94.                         mthr_showid_del(rs->tid);
  95.                         continue;
  96.                 }
  97.                 if(!task) {
  98.                         ShowLog(1,"%s:aft swapcontext task is NULL",__FUNCTION__);
  99.                         continue;
  100.                 }
  101.                 if(!task->AIO_flg) {//service complate
  102.                         if(!rs->tc.uc_stack.ss_size) {//回收fiber stack
  103. //ShowLog(5,"%s:%lx release fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  104.                                 rs->tc.uc_stack.ss_sp=task->uc.uc_stack.ss_sp;
  105.                                 if(rs->tc.uc_stack.ss_sp)
  106.                                          rs->tc.uc_stack.ss_size=use_stack_size;
  107.                                 else rs->tc.uc_stack.ss_size=0;
  108.                         } else {
  109. ShowLog(5,"%s:%lx destroy fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
  110.                                 if(task->uc.uc_stack.ss_sp)
  111.                                         munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
  112.                         }
  113.                         task->uc.uc_stack.ss_sp=NULL;
  114.                         rs->tc.uc_link=NULL;
  115.                         task->uc.uc_link=NULL;
  116.                         task->uc.uc_stack.ss_size=0;//mark fiber cpmplate
  117. //ShowLog(5,"%s:TCB_no=%d,tid=%lx,timeout=%d,conn.timeout=%d",__FUNCTION__,task->sv.TCB_no,rs->tid,task->timeout,task->conn.timeout);
  118.                 } else {
  119.                         pthread_mutex_unlock(&task->lock);

  120. ShowLog(5,"%s:tid=%lx,fiber yield from TCB_no=%d",
  121.                         __FUNCTION__,pthread_self(),task->sv.TCB_no);
  122.                 }
  123.                 mthr_showid_del(rs->tid);
  124.         }
  125.         ShowLog(1,"%s:tid=%lx canceled",__FUNCTION__,pthread_self());
  126.         mthr_showid_del(rs->tid);
  127.         rs->timestamp=now_usec();
  128.         rs->status=0;
  129.         rs->tid=0;
  130.         if(rs->tc.uc_stack.ss_sp) {
  131.                 munmap(rs->tc.uc_stack.ss_sp,rs->tc.uc_stack.ss_size);
  132.                 rs->tc.uc_stack.ss_sp=NULL;
  133.                 rs->tc.uc_stack.ss_size=0;
  134.         }
  135.         return NULL;
  136. }
复制代码

您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP