waruqi
发表于 2016-12-20 11:32
回复 8# yulihua49
我不会去用lockless机制构建生产者/消费者模型
而是通过channel或者协程lock,或者协程版本的信号量来处理,得不到资源,这个协程会被挂起,除非unlock(内部会resume协程),否者这个协程永远不会被调度到
我内部实现协程的 suspend/resume和yield是有区别的,等待lock的suspend模式,会把这个协程从调度链里面移除掉,不再被调度,除非unlock(resume),所以不存在cpu问题
而yield是让出当前协程,但是之后还是会被调度到,跟suspend还是有区别的,因此我在实现协程锁的时候,内部并没有用yield,而是直接suspend了。
sditmaner
发表于 2016-12-20 13:11
楼主加油,我们都看好你哦
yulihua49
发表于 2016-12-20 20:04
本帖最后由 yulihua49 于 2016-12-20 20:12 编辑
waruqi 发表于 2016-12-20 11:25
而且我并没说必须要使用stackless协程,我的库里面stackfull和stackless都实现了,根据自己的实际需求,可 ...
这个多线程就好,何必协程?
sleep和挂起锁倒是可以考虑采用协程。
可以考虑实现:
coroutine_mutex_lock
coroutine_mutex_unlock
coroutine_cond_wait
coroutine_cond_signal
coroutine_sleep
waruqi
发表于 2016-12-21 20:31
回复 13# yulihua49
多线程同步还得加lock同步,我还是比较喜欢用协程简单写写就好了,一个线程就搞定了,没必要去浪费线程资源。。
下面的那些接口,我都有实现。。
wlmqgzm
发表于 2016-12-24 01:17
不错,支持,加油
yulihua49
发表于 2017-01-21 15:59
waruqi 发表于 2016-12-07 22:46
tbox之前提供的stackfull协程库,虽然切换效率已经非常高了,但是由于每个协程都需要维护一个独立的堆栈,...
tbox跟tboox是一回事吗?
zylthinking
发表于 2017-01-23 16:18
本帖最后由 zylthinking 于 2017-01-23 17:38 编辑
yulihua49 发表于 2016-12-09 10:36
以前跟猫讨论过这个问题。stackless对使用的限制不是一般的大,你的应用不可能不使用第三方软件。人家如何 ...
你这个是怎么一个用法啊
corotine(char* addr) {
int ip = addr;
char buf[];
while(1) {
int n = readfrom(*buf, ip);
char c = buf;
int n2 = readfrom(ip);
write(buf, ip);
}
}
比如这样的协程, 切换发生在 readfrom 和 write 内部; 你如何做到 只给活动协程分配栈 的同时还能保存 ip, n, n2, buf这些数据?
注意它还是个 while(1);意思是 ip, buf 应该始终可访问;
你的是类似任务池似得吧, 本质是将缓冲, 各种寄存器保存到任务上, 处理这个任务的协程从任务本身加载寄存器,
类似这种:
thread(char* addr) {
while(1) {
fd = epoll_wait();
if (null == (task = task_get(fd))) {
task = new task();
task.ip = proc;
task.sp = malloc();
bind_task_to(fd, task);
corutine_add_run_list(task);
} else {
corutine_resume(task);
}
}
}
one theead in corotine threads pool()
{
while(1) {
mysp = sp;
myip = ip;
select one task from runlist;
load register from task
jmp to task.ip
}
}
proc(task) {
char buf[];
int n = readfrom(*buf, ip); // yield when not ready
char c = buf;
write(c, ip);// yield when not ready
bind_task_to(fd, null);
free(task);
add_to_epoll_noblock(fd);
sp = mysp;
jmp myip;
}
但这个其实并没有简化编程;
如果硬要给他一个意义,
只能假设这里 task 是一个动作, 而是为了达到这个动作需要进行多次协议交换, 因此相关协议在一个函数内顺序处理, 比普通 epoll 所有协议平坦处理看上去要容易理解;
但这个 task 也不能有一个 session 级别的生命期; 否则, 就是每个连接一个 task, 每个task 一个私有栈,那也就是标准的 taskfull, 也就达不到所谓节省内存的目的了
如果 task 只代表一个协议呢, 那其实和不用协程的带任务队列的 epoll 没区别,至少, 不用花多大力气, 就能做到同样效果
yulihua49
发表于 2017-01-23 20:13
本帖最后由 yulihua49 于 2017-01-23 20:21 编辑
zylthinking 发表于 2017-01-23 16:18
你这个是怎么一个用法啊
corotine(char* addr) {
在epoll_wait()之后为task分配栈,之后由makecontext()配置工作函数入口,由swapcontext转移到工作函数。所有的收发都在工作函数里。
实际的过程比说的复杂的多,当然是不省事,而且调试极为困难,搞了好几个月。但是这部分是框架,只弄一次,以后你所有的do_work()都会非常省事。
static void *thread_work(void *param)
{
resource *rs=(resource *)param;
int ret,fds;
TCB *task=NULL;
struct epoll_event event;
ShowLog(2,"%s:thread %lx start!",__FUNCTION__,pthread_self());
getcontext(&rs->tc);
if(task)pthread_mutex_unlock(&task->lock);
while(1) {
//从就绪队列取一个任务
pthread_mutex_lock(&rpool.mut);
while(!(task=rdy_get())) {
if(rpool.flg >= tpool.rdy_num) break;
rpool.flg++;
ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待
rpool.flg--;
}
pthread_mutex_unlock(&rpool.mut);
if(task) {
if(!task->AIO_flg && !task->call_back) {
task->fd=task->conn.Socket;
ShowLog(5,"%s:tid=%lx,TCB_no=%d from rdy_queue",__FUNCTION__,
pthread_self(),task->sv.TCB_no);
if(task->fd>=0) {
do_epoll(task,0,0);
}
continue;
}
} else{
fds = epoll_wait(g_epoll_fd, &event, 1 , -1);
if(fds < 0){
ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno));
usleep(30000000);
continue;
}
task = (TCB *)event.data.ptr;
if(task->events) {
ShowLog(1,"%s:tid=%lx,TCB_no=%d,task->events=%08X,conflict!",__FUNCTION__,
pthread_self(),task->sv.TCB_no,task->events);//发现惊群
task=NULL;
continue;//丢掉它
}
task->events=event.events;
}
rs->timestamp=now_usec();
if(task->status>0) set_showid(task->ctx);//Showid 应该在会话上下文结构里
if(task->AIO_flg) {//fiber task
task->uc.uc_link=&rs->tc;
rs->tc.uc_link=(ucontext_t *)task;
ShowLog(5,"%s:tid=%lx,resume to TCB_no=%d",__FUNCTION__,pthread_self(),task->sv.TCB_no);
pthread_mutex_lock(&task->lock);//防止其他线程提前闯入
setcontext(&task->uc); //== longjmp()
continue;//no action,logic only
}
if(task->uc.uc_stack.ss_size>0) {//call_back模式,抢入了,进入同步模式
rs->tc.uc_link=NULL;
ShowLog(5,"%s:tid %lx 抢入 SYNC",__FUNCTION__,pthread_self());
do_work(task->sv.TCB_no);
continue;
}
if(!rs->tc.uc_stack.ss_sp) {
ShowLog(5,"%s:%lx create fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
task->uc.uc_stack.ss_sp=mmap(0, use_stack_size,
PROT_READ | PROT_WRITE | PROT_EXEC,
MAP_PRIVATE | MAP_ANON | MAP_GROWSDOWN, -1, 0);
if(task->uc.uc_stack.ss_sp==MAP_FAILED) {
task->uc.uc_stack.ss_sp=NULL;
do_work(task->sv.TCB_no); //进行你的服务,不使用AIO
continue;
}
} else {
//ShowLog(5,"%s:%lx reuse fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
task->uc.uc_stack.ss_sp=rs->tc.uc_stack.ss_sp;
rs->tc.uc_stack.ss_sp=NULL;
rs->tc.uc_stack.ss_size=0;
}
task->uc.uc_stack.ss_size=use_stack_size;
task->uc.uc_link=&rs->tc;
rs->tc.uc_link=(ucontext_t *)task;
makecontext(&task->uc,(void (*)())do_work,1,task->sv.TCB_no);
ret=swapcontext(&rs->tc,&task->uc);
if(ret<0) {
ShowLog(1,"%s:swapcontext fault TCB_NO=%d,tid=%lx,errno=%d,%s",
__FUNCTION__,task->sv.TCB_no,pthread_self(),ret,strerror(abs(ret)));
rs->tc.uc_link=NULL;
task->uc.uc_link=NULL;
if(task->uc.uc_stack.ss_sp)
munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
task->uc.uc_stack.ss_sp=NULL;
task->uc.uc_stack.ss_size=0;
do_work(task->sv.TCB_no);
mthr_showid_del(rs->tid);
continue;
}
if(!task) {
ShowLog(1,"%s:aft swapcontext task is NULL",__FUNCTION__);
continue;
}
if(!task->AIO_flg) {//service complate
if(!rs->tc.uc_stack.ss_size) {//回收fiber stack
//ShowLog(5,"%s:%lx release fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
rs->tc.uc_stack.ss_sp=task->uc.uc_stack.ss_sp;
if(rs->tc.uc_stack.ss_sp)
rs->tc.uc_stack.ss_size=use_stack_size;
else rs->tc.uc_stack.ss_size=0;
} else {
ShowLog(5,"%s:%lx destroy fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
if(task->uc.uc_stack.ss_sp)
munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
}
task->uc.uc_stack.ss_sp=NULL;
rs->tc.uc_link=NULL;
task->uc.uc_link=NULL;
task->uc.uc_stack.ss_size=0;//mark fiber cpmplate
//ShowLog(5,"%s:TCB_no=%d,tid=%lx,timeout=%d,conn.timeout=%d",__FUNCTION__,task->sv.TCB_no,rs->tid,task->timeout,task->conn.timeout);
} else {
pthread_mutex_unlock(&task->lock);
ShowLog(5,"%s:tid=%lx,fiber yield from TCB_no=%d",
__FUNCTION__,pthread_self(),task->sv.TCB_no);
}
mthr_showid_del(rs->tid);
}
ShowLog(1,"%s:tid=%lx canceled",__FUNCTION__,pthread_self());
mthr_showid_del(rs->tid);
rs->timestamp=now_usec();
rs->status=0;
rs->tid=0;
if(rs->tc.uc_stack.ss_sp) {
munmap(rs->tc.uc_stack.ss_sp,rs->tc.uc_stack.ss_size);
rs->tc.uc_stack.ss_sp=NULL;
rs->tc.uc_stack.ss_size=0;
}
return NULL;
}
yulihua49
发表于 2017-01-23 20:23
本帖最后由 yulihua49 于 2017-01-23 20:24 编辑
回复 17# zylthinking 怎么发了个回复没了?static void *thread_work(void *param)
{
resource *rs=(resource *)param;
int ret,fds;
TCB *task=NULL;
struct epoll_event event;
ShowLog(2,"%s:thread %lx start!",__FUNCTION__,pthread_self());
getcontext(&rs->tc);
if(task)pthread_mutex_unlock(&task->lock);
while(1) {
//从就绪队列取一个任务
pthread_mutex_lock(&rpool.mut);
while(!(task=rdy_get())) {
if(rpool.flg >= tpool.rdy_num) break;
rpool.flg++;
ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待
rpool.flg--;
}
pthread_mutex_unlock(&rpool.mut);
if(task) {
if(!task->AIO_flg && !task->call_back) {
task->fd=task->conn.Socket;
ShowLog(5,"%s:tid=%lx,TCB_no=%d from rdy_queue",__FUNCTION__,
pthread_self(),task->sv.TCB_no);
if(task->fd>=0) {
do_epoll(task,0,0);
}
continue;
}
} else{
fds = epoll_wait(g_epoll_fd, &event, 1 , -1);
if(fds < 0){
ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno));
usleep(30000000);
continue;
}
task = (TCB *)event.data.ptr;
if(task->events) {
ShowLog(1,"%s:tid=%lx,TCB_no=%d,task->events=%08X,conflict!",__FUNCTION__,
pthread_self(),task->sv.TCB_no,task->events);//发现惊群
task=NULL;
continue;//丢掉它
}
task->events=event.events;
}
rs->timestamp=now_usec();
if(task->status>0) set_showid(task->ctx);//Showid 应该在会话上下文结构里
if(task->AIO_flg) {//fiber task
task->uc.uc_link=&rs->tc;
rs->tc.uc_link=(ucontext_t *)task;
ShowLog(5,"%s:tid=%lx,resume to TCB_no=%d",__FUNCTION__,pthread_self(),task->sv.TCB_no);
pthread_mutex_lock(&task->lock);//防止其他线程提前闯入
setcontext(&task->uc); //== longjmp()
continue;//no action,logic only
}
if(task->uc.uc_stack.ss_size>0) {//call_back模式,抢入了,进入同步模式
rs->tc.uc_link=NULL;
ShowLog(5,"%s:tid %lx 抢入 SYNC",__FUNCTION__,pthread_self());
do_work(task->sv.TCB_no);
continue;
}
if(!rs->tc.uc_stack.ss_sp) {
ShowLog(5,"%s:%lx create fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
task->uc.uc_stack.ss_sp=mmap(0, use_stack_size,
PROT_READ | PROT_WRITE | PROT_EXEC,
MAP_PRIVATE | MAP_ANON | MAP_GROWSDOWN, -1, 0);
if(task->uc.uc_stack.ss_sp==MAP_FAILED) {
task->uc.uc_stack.ss_sp=NULL;
do_work(task->sv.TCB_no); //进行你的服务,不使用AIO
continue;
}
} else {
//ShowLog(5,"%s:%lx reuse fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
task->uc.uc_stack.ss_sp=rs->tc.uc_stack.ss_sp;
rs->tc.uc_stack.ss_sp=NULL;
rs->tc.uc_stack.ss_size=0;
}
task->uc.uc_stack.ss_size=use_stack_size;
task->uc.uc_link=&rs->tc;
rs->tc.uc_link=(ucontext_t *)task;
makecontext(&task->uc,(void (*)())do_work,1,task->sv.TCB_no);
ret=swapcontext(&rs->tc,&task->uc);
if(ret<0) {
ShowLog(1,"%s:swapcontext fault TCB_NO=%d,tid=%lx,errno=%d,%s",
__FUNCTION__,task->sv.TCB_no,pthread_self(),ret,strerror(abs(ret)));
rs->tc.uc_link=NULL;
task->uc.uc_link=NULL;
if(task->uc.uc_stack.ss_sp)
munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
task->uc.uc_stack.ss_sp=NULL;
task->uc.uc_stack.ss_size=0;
do_work(task->sv.TCB_no);
mthr_showid_del(rs->tid);
continue;
}
if(!task) {
ShowLog(1,"%s:aft swapcontext task is NULL",__FUNCTION__);
continue;
}
if(!task->AIO_flg) {//service complate
if(!rs->tc.uc_stack.ss_size) {//回收fiber stack
//ShowLog(5,"%s:%lx release fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
rs->tc.uc_stack.ss_sp=task->uc.uc_stack.ss_sp;
if(rs->tc.uc_stack.ss_sp)
rs->tc.uc_stack.ss_size=use_stack_size;
else rs->tc.uc_stack.ss_size=0;
} else {
ShowLog(5,"%s:%lx destroy fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
if(task->uc.uc_stack.ss_sp)
munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
}
task->uc.uc_stack.ss_sp=NULL;
rs->tc.uc_link=NULL;
task->uc.uc_link=NULL;
task->uc.uc_stack.ss_size=0;//mark fiber cpmplate
//ShowLog(5,"%s:TCB_no=%d,tid=%lx,timeout=%d,conn.timeout=%d",__FUNCTION__,task->sv.TCB_no,rs->tid,task->timeout,task->conn.timeout);
} else {
pthread_mutex_unlock(&task->lock);
ShowLog(5,"%s:tid=%lx,fiber yield from TCB_no=%d",
__FUNCTION__,pthread_self(),task->sv.TCB_no);
}
mthr_showid_del(rs->tid);
}
ShowLog(1,"%s:tid=%lx canceled",__FUNCTION__,pthread_self());
mthr_showid_del(rs->tid);
rs->timestamp=now_usec();
rs->status=0;
rs->tid=0;
if(rs->tc.uc_stack.ss_sp) {
munmap(rs->tc.uc_stack.ss_sp,rs->tc.uc_stack.ss_size);
rs->tc.uc_stack.ss_sp=NULL;
rs->tc.uc_stack.ss_size=0;
}
return NULL;
}
yulihua49
发表于 2017-01-23 20:25
static void *thread_work(void *param)
{
resource *rs=(resource *)param;
int ret,fds;
TCB *task=NULL;
struct epoll_event event;
ShowLog(2,"%s:thread %lx start!",__FUNCTION__,pthread_self());
getcontext(&rs->tc);
if(task)pthread_mutex_unlock(&task->lock);
while(1) {
//从就绪队列取一个任务
pthread_mutex_lock(&rpool.mut);
while(!(task=rdy_get())) {
if(rpool.flg >= tpool.rdy_num) break;
rpool.flg++;
ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待
rpool.flg--;
}
pthread_mutex_unlock(&rpool.mut);
if(task) {
if(!task->AIO_flg && !task->call_back) {
task->fd=task->conn.Socket;
ShowLog(5,"%s:tid=%lx,TCB_no=%d from rdy_queue",__FUNCTION__,
pthread_self(),task->sv.TCB_no);
if(task->fd>=0) {
do_epoll(task,0,0);
}
continue;
}
} else{
fds = epoll_wait(g_epoll_fd, &event, 1 , -1);
if(fds < 0){
ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno));
usleep(30000000);
continue;
}
task = (TCB *)event.data.ptr;
if(task->events) {
ShowLog(1,"%s:tid=%lx,TCB_no=%d,task->events=%08X,conflict!",__FUNCTION__,
pthread_self(),task->sv.TCB_no,task->events);//发现惊群
task=NULL;
continue;//丢掉它
}
task->events=event.events;
}
rs->timestamp=now_usec();
if(task->status>0) set_showid(task->ctx);//Showid 应该在会话上下文结构里
if(task->AIO_flg) {//fiber task
task->uc.uc_link=&rs->tc;
rs->tc.uc_link=(ucontext_t *)task;
ShowLog(5,"%s:tid=%lx,resume to TCB_no=%d",__FUNCTION__,pthread_self(),task->sv.TCB_no);
pthread_mutex_lock(&task->lock);//防止其他线程提前闯入
setcontext(&task->uc); //== longjmp()
continue;//no action,logic only
}
if(task->uc.uc_stack.ss_size>0) {//call_back模式,抢入了,进入同步模式
rs->tc.uc_link=NULL;
ShowLog(5,"%s:tid %lx 抢入 SYNC",__FUNCTION__,pthread_self());
do_work(task->sv.TCB_no);
continue;
}
if(!rs->tc.uc_stack.ss_sp) {
ShowLog(5,"%s:%lx create fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
task->uc.uc_stack.ss_sp=mmap(0, use_stack_size,
PROT_READ | PROT_WRITE | PROT_EXEC,
MAP_PRIVATE | MAP_ANON | MAP_GROWSDOWN, -1, 0);
if(task->uc.uc_stack.ss_sp==MAP_FAILED) {
task->uc.uc_stack.ss_sp=NULL;
do_work(task->sv.TCB_no); //进行你的服务,不使用AIO
continue;
}
} else {
//ShowLog(5,"%s:%lx reuse fiber for TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
task->uc.uc_stack.ss_sp=rs->tc.uc_stack.ss_sp;
rs->tc.uc_stack.ss_sp=NULL;
rs->tc.uc_stack.ss_size=0;
}
task->uc.uc_stack.ss_size=use_stack_size;
task->uc.uc_link=&rs->tc;
rs->tc.uc_link=(ucontext_t *)task;
makecontext(&task->uc,(void (*)())do_work,1,task->sv.TCB_no);
ret=swapcontext(&rs->tc,&task->uc);
if(ret<0) {
ShowLog(1,"%s:swapcontext fault TCB_NO=%d,tid=%lx,errno=%d,%s",
__FUNCTION__,task->sv.TCB_no,pthread_self(),ret,strerror(abs(ret)));
rs->tc.uc_link=NULL;
task->uc.uc_link=NULL;
if(task->uc.uc_stack.ss_sp)
munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
task->uc.uc_stack.ss_sp=NULL;
task->uc.uc_stack.ss_size=0;
do_work(task->sv.TCB_no);
mthr_showid_del(rs->tid);
continue;
}
if(!task) {
ShowLog(1,"%s:aft swapcontext task is NULL",__FUNCTION__);
continue;
}
if(!task->AIO_flg) {//service complate
if(!rs->tc.uc_stack.ss_size) {//回收fiber stack
//ShowLog(5,"%s:%lx release fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
rs->tc.uc_stack.ss_sp=task->uc.uc_stack.ss_sp;
if(rs->tc.uc_stack.ss_sp)
rs->tc.uc_stack.ss_size=use_stack_size;
else rs->tc.uc_stack.ss_size=0;
} else {
ShowLog(5,"%s:%lx destroy fiber from TCB_no=%d",__FUNCTION__,rs->tid,task->sv.TCB_no);
if(task->uc.uc_stack.ss_sp)
munmap(task->uc.uc_stack.ss_sp,task->uc.uc_stack.ss_size);
}
task->uc.uc_stack.ss_sp=NULL;
rs->tc.uc_link=NULL;
task->uc.uc_link=NULL;
task->uc.uc_stack.ss_size=0;//mark fiber cpmplate
//ShowLog(5,"%s:TCB_no=%d,tid=%lx,timeout=%d,conn.timeout=%d",__FUNCTION__,task->sv.TCB_no,rs->tid,task->timeout,task->conn.timeout);
} else {
pthread_mutex_unlock(&task->lock);
ShowLog(5,"%s:tid=%lx,fiber yield from TCB_no=%d",
__FUNCTION__,pthread_self(),task->sv.TCB_no);
}
mthr_showid_del(rs->tid);
}
ShowLog(1,"%s:tid=%lx canceled",__FUNCTION__,pthread_self());
mthr_showid_del(rs->tid);
rs->timestamp=now_usec();
rs->status=0;
rs->tid=0;
if(rs->tc.uc_stack.ss_sp) {
munmap(rs->tc.uc_stack.ss_sp,rs->tc.uc_stack.ss_size);
rs->tc.uc_stack.ss_sp=NULL;
rs->tc.uc_stack.ss_size=0;
}
return NULL;
}