- 论坛徽章:
- 0
|
2, threadpool.c
/* -------------------------------------------------------------------------
* threadpool.c - thread pool functions
* -------------------------------------------------------------------------
*/
/* $Id: threadpool.c,v 0.1 2007/12/11 17:15:33 sysit Exp $ */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include "threadpool.h"
/* the private function */
void *threadpool_thread(void *tpool);
static int add_thread(threadpool_t *pool);
static int delete_thread(threadpool_t *pool);
static int get_thread_by_id(threadpool_t *pool, int id);
static int get_tp_status(threadpool_t *pool);
static void *tp_manage_thread(void *pool);
threadpool_t *threadpool_init(int min_threads, int max_threads)
{
int i, rtn;
threadpool_t *pool;
/* make the thread pool structure */
if((pool = (struct threadpool *)malloc(sizeof(struct threadpool))) == NULL)
{
printf("Unable to malloc() thread pool!\n" ;
return NULL;
}
memset(pool, 0, sizeof(struct threadpool));
/* set the desired thread pool values */
pool->cur_threads = min_threads + 5;
pool->min_threads = min_threads;
pool->max_threads = max_threads;
/* create manager threads */
rtn = pthread_create(&pool->manage_thread_id, NULL, tp_manage_thread, pool);
if(0 != rtn){
printf("tp_init: creat manage thread failed\n" ;
return FALSE;
}
printf("tp_init: creat manage thread %d\n", (int)pool->manage_thread_id);
/* create the threadpool mutexs and cond vars */
if((rtn = pthread_mutex_init(&(pool->tp_lock),NULL)) != 0) {
printf("pthread_mutex_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->tp_cond),NULL)) != 0) {
printf("pthread_cond_init %s",strerror(rtn));
return NULL;
}
/* initialize the shutdown*/
pool->shutdown = 0;
/* create an array to hold a ptr to the worker threads */
if((pool->threads_info = (thread_t *)malloc(sizeof(thread_t)
*max_threads)) == NULL)
{
printf("Unable to malloc() thread info array\n" ;
return NULL;
}
/* create the individual worker threads */
for(i=0;i<pool->cur_threads;i++){
pthread_cond_init(&pool->threads_info.thread_cond, NULL);
pthread_mutex_init(&pool->threads_info.thread_lock, NULL);
pool->threads_info.is_busy = 0;
rtn = pthread_create(&pool->threads_info.thread_id, NULL, threadpool_thread, (void*)pool);
if(0 != rtn){
printf("tp_init: creat work thread failed\n" ;
return FALSE;
}
printf("tp_init: creat work thread %d\n", (int)pool->threads_info.thread_id);
}
return pool;
}
int threadpool_dispatch(threadpool_t *pool, void (*routine)(void *), void *arg)
{
int i;
thread_work_t *workp;
if (pool->shutdown){
printf("threadpool_dispatch: the threadpool shutdown now!\n" ;
return -1;
}
/* control max threads */
if (pool->cur_threads >= pool->max_threads){
printf("threadpool_dispatch: threadpool busy,dispatch failed\n" ;
}
/*
* while (pool->cur_threads == pool->max_threads){
* if((rtn = pthread_cond_wait(&pool->tp_cond,
* &pool->tp_lock) ) != 0) {
* printf("threadpool_dispatch: pthread cond wait failure\n" ;
* return -1;
* }
* }
*/
/* allocate the work structure */
if((workp = (thread_work_t *)malloc(sizeof(thread_work_t))) == NULL) {
printf("unable to create work struct\n" ;
return -1;
}
workp->handler_routine = routine;
workp->arg = arg;
workp->next = NULL;
for(i=0;i<pool->cur_threads;i++){
pthread_mutex_lock(&pool->threads_info.thread_lock);
if(!pool->threads_info.is_busy){
pool->threads_info.is_busy = 1;
pool->threads_info.th_work = workp;
printf("threadpool_dispatch: informing idle working thread %d, thread id is %d\n", i, (int)pool->threads_info.thread_id);
pthread_mutex_unlock(&pool->threads_info.thread_lock);
pthread_cond_signal(&pool->threads_info.thread_cond);
return 0;
} else {
pthread_mutex_unlock(&pool->threads_info.thread_lock);
}
}
/* there is no idle threads, let's create one */
pthread_mutex_lock(&pool->tp_lock);
if( add_thread(pool) ){
i = pool->cur_threads - 1;
pool->threads_info.th_work = workp;
}
pthread_mutex_unlock(&pool->tp_lock);
printf("threadpool_dispatch: informing idle working thread %d, thread id is %d\n", i, (int)pool->threads_info.thread_id);
pthread_cond_signal(&pool->threads_info.thread_cond);
return 0;
}
int threadpool_destroy(threadpool_t *pool)
{
int i, rtn;
/* set the shutdown flag */
pool->shutdown = 1;
pthread_join(pool->manage_thread_id,NULL);
kill(pool->manage_thread_id, SIGKILL);
printf("threadpool_destory: kill manage thread %d\n", (int)pool->manage_thread_id);
/* wait for workers to exit */
for(i = 0; i < pool->cur_threads; i++) {
if((rtn = pthread_cond_broadcast(&pool->threads_info.thread_cond)) != 0) {
printf("threadpool_destory: pthread_cond_broadcast %d\n",rtn);
return -1;
}
if((rtn = pthread_join(pool->threads_info.thread_id,NULL)) != 0) {
printf("threadpool_destory: pthread_join %d\n",rtn);
return -1;
}
}
for(i=0;i<pool->cur_threads;i++){
kill(pool->threads_info.thread_id, SIGKILL);
pthread_mutex_destroy(&pool->threads_info.thread_lock);
pthread_cond_destroy(&pool->threads_info.thread_cond);
printf("threadpool_destory: kill work thread %d\n", (int)pool->threads_info.thread_id);
}
pthread_mutex_destroy(&pool->tp_lock);
pthread_cond_destroy(&pool->tp_cond);
/* clean up memory */
free(pool->threads_info);
free(pool);
return 0;
}
void *threadpool_thread(void *tpool)
{
thread_work_t *my_work = NULL;
threadpool_t *pool = (struct threadpool *)tpool;
pthread_t curid;
int nseq;
curid = pthread_self();
nseq = get_thread_by_id(pool, curid);
if(nseq < 0)
return NULL;
printf("entering working thread %d, thread id is %d\n", nseq, (int)curid);
for(; /* go forever */
{
pthread_mutex_lock(&pool->threads_info[nseq].thread_lock);
pthread_cond_wait(&pool->threads_info[nseq].thread_cond, &pool->threads_info[nseq].thread_lock);
pthread_mutex_unlock(&pool->threads_info[nseq].thread_lock);
if (pool->shutdown == 1)
break;
printf("%d thread do work!\n", (int)pthread_self());
my_work = pool->threads_info[nseq].th_work;
/* perform the work */
(*(my_work->handler_routine))(my_work->arg);
free(my_work);
pthread_mutex_lock(&pool->threads_info[nseq].thread_lock);
pool->threads_info[nseq].th_work = NULL;
pool->threads_info[nseq].is_busy = 0;
pthread_mutex_unlock(&pool->threads_info[nseq].thread_lock);
printf("%d do work over\n", (int)pthread_self());
}
return(NULL);
}
static int add_thread(threadpool_t *pool){
int rtn;
thread_t *new_thread;
if( pool->max_threads <= pool->cur_threads )
return FALSE;
new_thread = &pool->threads_info[pool->cur_threads];
pthread_cond_init(&new_thread->thread_cond, NULL);
pthread_mutex_init(&new_thread->thread_lock, NULL);
new_thread->is_busy = 1;
pool->cur_threads++;
rtn = pthread_create(&new_thread->thread_id, NULL, threadpool_thread, pool);
if(0 != rtn){
free(new_thread);
return FALSE;
}
printf("add_thread: creat work thread %d\n", (int)pool->threads_info[pool->cur_threads-1].thread_id);
return TRUE;
}
static int delete_thread(threadpool_t *pool){
if(pool->cur_threads <= pool->min_threads) return FALSE;
if(pool->threads_info[pool->cur_threads-1].is_busy) return FALSE;
kill(pool->threads_info[pool->cur_threads-1].thread_id, SIGKILL);
pthread_mutex_destroy(&pool->threads_info[pool->cur_threads-1].thread_lock);
pthread_cond_destroy(&pool->threads_info[pool->cur_threads-1].thread_cond);
printf("delete_thread: delete idle thread %d\n", (int)pool->threads_info[pool->cur_threads-1].thread_id);
pool->cur_threads--;
return TRUE;
}
int get_thread_by_id(threadpool_t *pool, int id){
int i;
for(i=0;i<pool->cur_threads;i++){
if(id == pool->threads_info.thread_id)
return i;
}
return -1;
}
static int get_tp_status(threadpool_t *pool){
float busy_num = 0.0;
int i;
for(i=0;i<pool->cur_threads;i++){
if(pool->threads_info.is_busy)
busy_num++;
}
if(busy_num/(pool->cur_threads) < BUSY_THRESHOLD)
return 0;
else
return 1;
}
static void *tp_manage_thread(void *threadpool){
threadpool_t *pool = (threadpool_t *)threadpool;
sleep(MANAGE_INTERVAL);
do{
if( get_tp_status(pool) == 0 ){
do{
if( !delete_thread(pool) )
break;
}while(TRUE);
}
sleep(MANAGE_INTERVAL);
}while(TRUE);
}
void print_tp_status(threadpool_t *pool){
float busy_num = 0.0;
int i;
for(i=0;i<pool->cur_threads;i++){
if(pool->threads_info.is_busy)
busy_num++;
}
printf("Thread pool status: total threads %d in pool; %d threads busy; usage rate is %f .\n",(int)pool->cur_threads,(int)busy_num,busy_num/(pool->cur_threads));
return ;
} |
|