- 论坛徽章:
- 15
|
- static void *thread_work(void *param)
- {
- resource *rs=(resource *)param;
- int ret,fds;
- TCB *task=NULL;
- struct epoll_event event;
- ShowLog(2,"%s:thread %lx start!",__FUNCTION__,pthread_self());
- getcontext(&rs->tc);
- if(task) pthread_mutex_unlock(&task->lock);
- while(1) {
- //从就绪队列取一个任务
- pthread_mutex_lock(&rpool.mut);
- while(!(task=rdy_get())) {
- if(rpool.flg >= tpool.rdy_num) break;
- rpool.flg++;
- ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待
- rpool.flg--;
- }
- pthread_mutex_unlock(&rpool.mut);
- if(task) {
- if(!task->AIO_flg && !task->call_back) {
- task->fd=task->conn.Socket;
- ShowLog(5,"%s:tid=%lx,TCB_no=%d from rdy_queue",__FUNCTION__,
- pthread_self(),task->sv.TCB_no);
- if(task->fd>=0) {
- do_epoll(task,0,0);
- }
- continue;
- }
- } else {
- fds = epoll_wait(g_epoll_fd, &event, 1 , -1);
- if(fds < 0){
- ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno));
- usleep(30000000);
- continue;
- }
- task = (TCB *)event.data.ptr;
- if(task->events) {
- ShowLog(1,"%s:tid=%lx,TCB_no=%d,task->events=%08X,conflict!",__FUNCTION__,
- pthread_self(),task->sv.TCB_no,task->events);//发现惊群
- task=NULL;
- continue;//丢掉它
- }
- task->events=event.events;
- }
- rs->timestamp=now_usec();
- if(task->status>0) set_showid(task->ctx);//Showid 应该在会话上下文结构里
-
- if(task->AIO_flg) {//fiber task
- task->uc.uc_link=&rs->tc;
- rs->tc.uc_link=(ucontext_t *)task;
- ShowLog(5,"%s:tid=%lx,resume to TCB_no=%d",__FUNCTION__,pthread_self(),task->sv.TCB_no);
- pthread_mutex_lock(&task->lock);//防止其他线程提前闯入
- setcontext(&task->uc); //== longjmp()
- continue;//no action,logic only
- }
- if(task->uc.uc_stack.ss_size>0) {//call_back模式,抢入了,进入同步模式
- rs->tc.uc_link=NULL;
- ShowLog(5,"%s:tid %lx 抢入 SYNC",__FUNCTION__,pthread_self());
- do_work(task->sv.TCB_no);
- continue;
- }
- if(!rs->tc.uc_stack.ss_sp) {
- ShowLog(5,"%s:%lx create fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
- task->uc.uc_stack.ss_sp=mmap(0, use_stack_size,
- PROT_READ | PROT_WRITE | PROT_EXEC,
- MAP_PRIVATE | MAP_ANON | MAP_GROWSDOWN, -1, 0);
- if(task->uc.uc_stack.ss_sp==MAP_FAILED) {
- task->uc.uc_stack.ss_sp=NULL;
- do_work(task->sv.TCB_no); //进行你的服务,不使用AIO
- continue;
- }
- } else {
- //ShowLog(5,"%s:%lx reuse fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
- task->uc.uc_stack.ss_sp=rs->tc.uc_stack.ss_sp;
- rs->tc.uc_stack.ss_sp=NULL;
- rs->tc.uc_stack.ss_size=0;
- }
- task->uc.uc_stack.ss_size=use_stack_size;
- task->uc.uc_link=&rs->tc;
- rs->tc.uc_link=(ucontext_t *)task;
- makecontext(&task->uc,(void (*)())do_work,1,task->sv.TCB_no);
- ret=swapcontext(&rs->tc,&task->uc);
- if(ret<0) {
- ShowLog(1,"%s:swapcontext fault TCB_NO=%d,tid=%lx,errno=%d,%s",
- __FUNCTION__,task->sv.TCB_no,pthread_self(),ret,strerror(abs(ret)));
- rs->tc.uc_link=NULL;
- task->uc.uc_link=NULL;
- if(task->uc.uc_stack.ss_sp)
- munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
- task->uc.uc_stack.ss_sp=NULL;
- task->uc.uc_stack.ss_size=0;
- do_work(task->sv.TCB_no);
- mthr_showid_del(rs->tid);
- continue;
- }
- if(!task) {
- ShowLog(1,"%s:aft swapcontext task is NULL",__FUNCTION__);
- continue;
- }
- if(!task->AIO_flg) {//service complate
- if(!rs->tc.uc_stack.ss_size) {//回收fiber stack
- //ShowLog(5,"%s:%lx release fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
- rs->tc.uc_stack.ss_sp=task->uc.uc_stack.ss_sp;
- if(rs->tc.uc_stack.ss_sp)
- rs->tc.uc_stack.ss_size=use_stack_size;
- else rs->tc.uc_stack.ss_size=0;
- } else {
- ShowLog(5,"%s:%lx destroy fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
- if(task->uc.uc_stack.ss_sp)
- munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
- }
- task->uc.uc_stack.ss_sp=NULL;
- rs->tc.uc_link=NULL;
- task->uc.uc_link=NULL;
- task->uc.uc_stack.ss_size=0;//mark fiber cpmplate
- //ShowLog(5,"%s:TCB_no=%d,tid=%lx,timeout=%d,conn.timeout=%d",__FUNCTION__,task->sv.TCB_no,rs->tid,task->timeout,task->conn.timeout);
- } else {
- pthread_mutex_unlock(&task->lock);
- ShowLog(5,"%s:tid=%lx,fiber yield from TCB_no=%d",
- __FUNCTION__,pthread_self(),task->sv.TCB_no);
- }
- mthr_showid_del(rs->tid);
- }
- ShowLog(1,"%s:tid=%lx canceled",__FUNCTION__,pthread_self());
- mthr_showid_del(rs->tid);
- rs->timestamp=now_usec();
- rs->status=0;
- rs->tid=0;
- if(rs->tc.uc_stack.ss_sp) {
- munmap(rs->tc.uc_stack.ss_sp,rs->tc.uc_stack.ss_size);
- rs->tc.uc_stack.ss_sp=NULL;
- rs->tc.uc_stack.ss_size=0;
- }
- return NULL;
- }
复制代码
|
|