static void *thread_work(void *param) { resource *rs=(resource *)param; int ret,fds; TCB *task=NULL; struct epoll_event event;
ShowLog(2,"%s:thread %lx start!",__FUNCTION__,pthread_self()); getcontext(&rs->tc); if(task) pthread_mutex_unlock(&task->lock);
while(1) { //从就绪队列取一个任务 pthread_mutex_lock(&rpool.mut); while(!(task=rdy_get())) { if(rpool.flg >= tpool.rdy_num) break; rpool.flg++; ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待 rpool.flg--; } pthread_mutex_unlock(&rpool.mut); if(task) { if(!task->AIO_flg && !task->call_back) { task->fd=task->conn.Socket; ShowLog(5,"%s:tid=%lx,TCB_no=%d from rdy_queue",__FUNCTION__, pthread_self(),task->sv.TCB_no); if(task->fd>=0) { do_epoll(task,0,0); } continue; } } else { fds = epoll_wait(g_epoll_fd, &event, 1 , -1); if(fds < 0){ ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno)); usleep(30000000); continue; } task = (TCB *)event.data.ptr; if(task->events) { ShowLog(1,"%s:tid=%lx,TCB_no=%d,task->events=%08X,conflict!",__FUNCTION__, pthread_self(),task->sv.TCB_no,task->events);//发现惊群 task=NULL; continue;//丢掉它 } task->events=event.events; } rs->timestamp=now_usec(); if(task->status>0) set_showid(task->ctx);//Showid 应该在会话上下文结构里 if(task->AIO_flg) {//fiber task task->uc.uc_link=&rs->tc; rs->tc.uc_link=(ucontext_t *)task; ShowLog(5,"%s:tid=%lx,resume to TCB_no=%d",__FUNCTION__,pthread_self(),task->sv.TCB_no); pthread_mutex_lock(&task->lock);//防止其他线程提前闯入 setcontext(&task->uc); //== longjmp() continue;//no action,logic only } if(task->uc.uc_stack.ss_size>0) {//call_back模式,抢入了,进入同步模式 rs->tc.uc_link=NULL; ShowLog(5,"%s:tid %lx 抢入 SYNC",__FUNCTION__,pthread_self()); do_work(task->sv.TCB_no); continue; } if(!rs->tc.uc_stack.ss_sp) { ShowLog(5,"%s:%lx create fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no); task->uc.uc_stack.ss_sp=mmap(0, use_stack_size, PROT_READ | PROT_WRITE | PROT_EXEC, MAP_PRIVATE | MAP_ANON | MAP_GROWSDOWN, -1, 0); if(task->uc.uc_stack.ss_sp==MAP_FAILED) { task->uc.uc_stack.ss_sp=NULL; do_work(task->sv.TCB_no); //进行你的服务,不使用AIO continue; } } else { //ShowLog(5,"%s:%lx reuse fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no); task->uc.uc_stack.ss_sp=rs->tc.uc_stack.ss_sp; rs->tc.uc_stack.ss_sp=NULL; rs->tc.uc_stack.ss_size=0; } task->uc.uc_stack.ss_size=use_stack_size; task->uc.uc_link=&rs->tc; rs->tc.uc_link=(ucontext_t *)task; makecontext(&task->uc,(void (*)())do_work,1,task->sv.TCB_no);
ret=swapcontext(&rs->tc,&task->uc); if(ret<0) { ShowLog(1,"%s:swapcontext fault TCB_NO=%d,tid=%lx,errno=%d,%s", __FUNCTION__,task->sv.TCB_no,pthread_self(),ret,strerror(abs(ret))); rs->tc.uc_link=NULL; task->uc.uc_link=NULL; if(task->uc.uc_stack.ss_sp) munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size); task->uc.uc_stack.ss_sp=NULL; task->uc.uc_stack.ss_size=0; do_work(task->sv.TCB_no); mthr_showid_del(rs->tid); continue; } if(!task) { ShowLog(1,"%s:aft swapcontext task is NULL",__FUNCTION__); continue; } if(!task->AIO_flg) {//service complate if(!rs->tc.uc_stack.ss_size) {//回收fiber stack //ShowLog(5,"%s:%lx release fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no); rs->tc.uc_stack.ss_sp=task->uc.uc_stack.ss_sp; if(rs->tc.uc_stack.ss_sp) rs->tc.uc_stack.ss_size=use_stack_size; else rs->tc.uc_stack.ss_size=0; } else { ShowLog(5,"%s:%lx destroy fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no); if(task->uc.uc_stack.ss_sp) munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size); } task->uc.uc_stack.ss_sp=NULL; rs->tc.uc_link=NULL; task->uc.uc_link=NULL; task->uc.uc_stack.ss_size=0;//mark fiber cpmplate //ShowLog(5,"%s:TCB_no=%d,tid=%lx,timeout=%d,conn.timeout=%d",__FUNCTION__,task->sv.TCB_no,rs->tid,task->timeout,task->conn.timeout); } else { pthread_mutex_unlock(&task->lock);
ShowLog(5,"%s:tid=%lx,fiber yield from TCB_no=%d", __FUNCTION__,pthread_self(),task->sv.TCB_no); } mthr_showid_del(rs->tid); } ShowLog(1,"%s:tid=%lx canceled",__FUNCTION__,pthread_self()); mthr_showid_del(rs->tid); rs->timestamp=now_usec(); rs->status=0; rs->tid=0; if(rs->tc.uc_stack.ss_sp) { munmap(rs->tc.uc_stack.ss_sp,rs->tc.uc_stack.ss_size); rs->tc.uc_stack.ss_sp=NULL; rs->tc.uc_stack.ss_size=0; } return NULL; }
|