免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 4104 | 回复: 1
打印 上一主题 下一主题

linux C语言实现线程池思想 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2010-02-28 12:26 |只看该作者 |倒序浏览
自己个人根据线程池的思想写的小程序,老鸟 大虾些就不要笑话小弟了
#include <stdio.h>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/types.h>
#include <semaphore.h>
#include <pthread.h>
#include <fcntl.h>
#include <stdlib.h>
#define SIZE 50

sem_t sem1,sem2,sem3;  //sem1:线程个数,sem2:要执行任务个数,sem3 排队任务总数

struct process                         //任务结构
{
        int (*process_fun)(const char  *); //任务函数
        char  process_arg[SIZE];           //任务参数
};
struct pthreadnode                     //线程结构
{
        struct process *pth_pt;            //线程任务
        pthread_t pth_id;                  //线程ID
        int pth_flag;                      //线程使用标志
        pthread_mutex_t pth_mutex;         //线程锁
};
struct lake                            //线程池
{
        struct process *lake_prohead;      //任务排队列表,这里申请为数组头结点
        int w;                             //w 后续任务插入位置,r为将要执行任务位置
        int r;
        int prono;                         //排队任务的个数,即lake_prohead数组大小
        struct pthreadnode *lake_pthhead;  //线程列表, 这里申请为数组头结点
        int lake_pthno;                    //线程个数
        pthread_t lake_id;                 //线程池 id
};
/*------------任务添加函数WHT---------*/
/*         T      为线程池指针
   process_fun    为任务函数
        process_arg    任务参数      */
void fun_add(struct lake *T,int (*process_fun)(const char *),const char *process_arg)
{
        (T->lake_prohead+T->w)->process_fun=process_fun;
        memcpy((T->lake_prohead+T->w)->process_arg,process_arg,SIZE);
        ++(T->w);         
        if(T->w == T->prono)
        {
                T->w=0;
        }        
}

/*------------线程清理函数WHT--------*/
void clean(void *p)
{
        pthread_mutex_unlock((pthread_mutex_t *)p);
}
/*------------线程执行函数WHT---------*/
void *pthread_fun(void *arg)
{
        struct pthreadnode *tmp=(struct pthreadnode *)arg;        
        pthread_cleanup_push(clean,&tmp->pth_mutex);
        while(1)
        {
                pthread_mutex_lock(&tmp->pth_mutex);  //阻塞,等待任务到来
                printf("<%x>  do %s\n",pthread_self(),tmp->pth_pt->process_arg);
                tmp->pth_pt->process_fun(tmp->pth_pt->process_arg);  //执行任务
               
                tmp->pth_flag=0;
                sem_post(&sem3);  //任务完成,可以空出一个等待任务栏
                sem_post(&sem1);  //线程使用完毕,空出一个线程
               
        }
        pthread_cleanup_pop(1);
        pthread_exit(0);
}
/*------------线程执行函数WHT---------*/
void *lake_goon(void *p)
{
        int i=0;
        struct lake *T=(struct lake *)p;
        while(1)
        {

                sem_wait(&sem1);  //等待线程是否有空的,无就等待
                 
                sem_wait(&sem2);  //等待是否有要执行的函数,无就等待
                                                                                       
                for(i=0;i<T->lake_pthno;++i)   //当有线程空&&有执行函数时 ,查找空闲线程
                {
                        if((T->lake_pthhead+i)->pth_flag == 0)
                        {
                                break;
                        }
                }
                        
                (T->lake_pthhead+i)->pth_flag=1;   //找到空闲线程并置1
                (T->lake_pthhead+i)->pth_pt=(T->lake_prohead+T->r); //线程任务 指向任务排队列表,r所在任务  
                ++ (T->r);                     //任务下个
                if(T->r == T->prono)                     
                {
                        T->r=0;        
                }
                pthread_mutex_unlock(&(T->lake_pthhead+i)->pth_mutex); //解锁线程,线程执行任务
        }        
        pthread_exit( (void*)0);
}

/*------ 线程池初始化-----WHT-----*/
/*------ T   线程池指针
         N1   创建线程个数
                 N2   等待任务最大个数WHT---*/
void lake_init(struct lake *T,const int N1,const int N2)
{

        int i=0;
        sem_init(&sem1,0,N1);    //线程个数为N1,因此信号量为N1,当无信号量就阻塞
        sem_init(&sem2,0,0);     //排队的任务,初始为0,有任务就+1
        sem_init(&sem3,0,N2);    //任务排队也有个数,初始为N2,当为0时,表示任务排队已满,要加入的任务继续等待
/*---------初始化 任务列表WHT-----*/
        T->lake_pthno=N1;
        T->lake_prohead=(struct process *)malloc(N2*sizeof(struct process));
        T->w=0;   
        T->r=0;
        T->prono=N2;
        for(i=0;i<T->prono;++i)
        {
                (T->lake_prohead+i)->process_fun=NULL;
                memset((T->lake_prohead+i)->process_arg,0,SIZE);
        }

/*---------初始化 线程列表WH~T-----*/
        T->lake_pthhead=(struct pthreadnode *)malloc(N1 *sizeof(struct pthreadnode));
        for(i=0 ; i<N1 ; ++i)
        {
                (T->lake_pthhead+i)->pth_flag=0;  //为0为线程空闲,为1为线程执行  
                (T->lake_pthhead+i)->pth_pt=NULL;
                pthread_mutex_init(&(T->lake_pthhead+i)->pth_mutex,0);
                /*---由于线程上锁,因此线程建立时会阻塞,不会执行下去---*/
                pthread_mutex_lock(&(T->lake_pthhead+i)->pth_mutex);
               
                /*---------创建N1个线程WH~T-----*/
                pthread_create(&(T->lake_pthhead+i)->pth_id,NULL,pthread_fun,T->lake_pthhead+i);
                 
        }        
                /*---------创建任务执行线程WH~T-----*/
        pthread_create(&T->lake_id,NULL,lake_goon,T);
}


/*--- 等待任务添加函数WH~T--*/
void * process_test(void *p)
{
        int fd=0;  //文件描述符
        int rc=0;  //读取字符个数
        int i=0;
        char buf[SIZE],tmp;  //零时存放区
        struct lake *T=(struct lake *) p;
        
        if((fd=open("a1.txt",O_RDONLY,0666))<0)  //打开文件,这里任务都放在a1.txt文件中
        {
                perror("open \n");
                exit(1);
        }

        while(1)
        {        
                i=0;
                while(1)         //读取一行命令,作为参数
                {
                        rc=read(fd,&tmp,1);
                        if(rc > 0)
                        {
                                if(tmp == 10)
                                {
                                        buf[i]='\0';
                                        break;
                                }
                                else
                                {
                                        buf[i]=tmp;                                                
                                }
                                ++i;                                                                        
                        }
                        else
                        {
                                break;
                        }
                }
                if(i>0)        //当有数据时
                {
                        sem_wait(&sem3);   //添加任务时,如果没有信号量,表示排队的已满,等待        
                        fun_add(T,system,buf);  //添加命令,这里函数我用system
                        sem_post(&sem2);   //添加完成 ,等待任务+1
                }
                else
                {
                        break;
                }               
               
        }
        pthread_exit(0);
}

int main()
{
        struct lake T;
        pthread_t id;
        lake_init(&T,3,5);   //线程的创建,建立2个线程,4个任务等待位置
                 
        pthread_create(&id,NULL,process_test,&T); //创建添加任务列表的线程
        
        pthread_join(id,NULL);
        pthread_join(T.lake_id,NULL);

        return 0;
}
W*H*T===================================================
a1.text 文件内容为
./01
./02
./03
./04
./05
./06
./07
V===================================================
01.c 程序为
void* printf1(void *p)
{
        sleep(10);
        printf("11111111111111111111111\n");
return (void*)0;        
}
int main(int argc, char *argv[])
{
        printf1(NULL);
        return 0;
}
02.c 程序为~~W*H*T=~~~~~~~~~~~~~~~~~~
void* printf2(void *p)
{
        sleep(10);
        printf("22222222222222222222\n");        
return (void*)0;
}
int main(int argc, char *argv[])
{
        printf2(NULL);
        return 0;
}
03.c 程序为~~W*H*T=~~~~~~~~~~~~~~~~~~~~~~~~
void* printf3(void *p)
{
        sleep(3);
        printf("333333333333333333333\n");        
return (void*)0;
}
int main(int argc, char *argv[])
{
        printf3(NULL);
        return 0;
}
04.c 程序为~~~W*H*T=~~~~~~~~~~~~~~~~~~~~~~~~
void* printf4(void *p)
{
        sleep(2);
        printf("444444444444444444444\n");        
                return (void*)0;
}

int main(int argc, char *argv[])
{
        printf4(NULL);
        return 0;
}
05.c 程序为~~W*H*T=~~~~~~~~~~~~~~~~~~~~~~~
void* printf5(void *p)
{
        sleep(1);
        printf("55555555555555555555555\n");
return (void*)0;        
}
int main(int argc, char *argv[])
{
        printf5(NULL);
        return 0;
}

06.c 程序为~~W*H*T=~~~~~~~~~~~~~~~~~~~~~~~~~~
void* printf6(void *p)
{
        sleep(10);
        printf("6666666666666666666666\n");        
return (void*)0;
}
int main(int argc, char *argv[])
{
        printf6(NULL);
        return 0;
}
07.c 程序为~~~~W*H*T=~~~~~~~~~~~~~~~~~~~~~~~
void* printf7(void *p)
{
        sleep(2);
        printf("777777777777777777777\n");        
return (void*)0;
}
int main(int argc, char *argv[])
{
        printf7(NULL);
        return 0;
}
==========W*H*T==============================

运行结果为
<4083acc0>  do ./01
<4103ac40>  do ./02
<4183abc0>  do ./03
333333333333333333333
<4183abc0>  do ./04
444444444444444444444
<4183abc0>  do ./05
55555555555555555555555
<4183abc0>  do ./06
11111111111111111111111
<4083acc0>  do ./07
22222222222222222222
777777777777777777777
6666666666666666666666

========W*H*T============================

论坛徽章:
0
2 [报告]
发表于 2010-03-11 16:53 |只看该作者
哇,好长啊,我决定学习了。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP