免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 2371 | 回复: 9
打印 上一主题 下一主题

[C] 线程池bug.高手帮忙修改一下 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2007-12-18 12:56 |只看该作者 |倒序浏览
新手忽悠的一段C代码,东抄一点,西抄一点. 实现完了之后发现有bug. 线程池里面的线程被分配出去执行任务相应非常慢. 请高手指点一下.

1, threadpool.h

/* -------------------------------------------------------------------------
* threadpool.h - thread pool defs
* -------------------------------------------------------------------------
*/
/* $Id: threadpool.h,v 0.1 2007/12/11 17:15:33 sysit Exp $ */

#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_

#include  <pthread.h>

#ifndef TRUE
#define TRUE 1
#define FALSE 0
#endif
#define BUSY_THRESHOLD 0.8
#define MANAGE_INTERVAL 1

typedef struct thread_work{
  void (*handler_routine)();
  void *arg;
  struct thread_work *next;
}thread_work_t;

typedef struct thread{
        pthread_t                thread_id;
        int                          is_busy;
        pthread_cond_t          thread_cond;
        pthread_mutex_t                thread_lock;
        thread_work_t                *th_work;
}thread_t;

typedef struct threadpool{
  int cur_threads;
  int min_threads;
  int max_threads;

  pthread_t manage_thread_id;
  thread_t  *threads_info;
  pthread_mutex_t tp_lock;
  pthread_cond_t  tp_cond;
  int shutdown;
} threadpool_t;

extern threadpool_t *threadpool_init(int min_threads, int max_threads);

extern int threadpool_dispatch(threadpool_t *pool, void  (*routine)(void *), void *arg);

extern int threadpool_destroy(threadpool_t *pool);

void print_tp_status(threadpool_t *pool);

#endif /* _THREADPOOL_H_ */

论坛徽章:
0
2 [报告]
发表于 2007-12-18 12:56 |只看该作者
2, threadpool.c

/* -------------------------------------------------------------------------
* threadpool.c - thread pool functions
* -------------------------------------------------------------------------
*/
/* $Id: threadpool.c,v 0.1 2007/12/11 17:15:33 sysit Exp $ */

#include        <stdio.h>
#include        <stdlib.h>
#include        <string.h>
#include        <pthread.h>
#include        <signal.h>
#include        <unistd.h>

#include "threadpool.h"

/* the private function */
void *threadpool_thread(void *tpool);
static int add_thread(threadpool_t *pool);
static int delete_thread(threadpool_t *pool);
static int get_thread_by_id(threadpool_t *pool, int id);
static int  get_tp_status(threadpool_t *pool);
static void *tp_manage_thread(void *pool);

threadpool_t *threadpool_init(int min_threads, int max_threads)
{
    int i, rtn;
    threadpool_t *pool;

    /* make the thread pool structure */
    if((pool = (struct threadpool *)malloc(sizeof(struct threadpool))) == NULL)
    {
        printf("Unable to malloc() thread pool!\n";
        return NULL;
    }
    memset(pool, 0, sizeof(struct threadpool));

    /* set the desired thread pool values */
    pool->cur_threads = min_threads + 5;
    pool->min_threads = min_threads;
    pool->max_threads = max_threads;

                /* create manager threads */
                rtn = pthread_create(&pool->manage_thread_id, NULL, tp_manage_thread, pool);
          if(0 != rtn){
                        printf("tp_init: creat manage thread failed\n";
                        return FALSE;
                }
                printf("tp_init: creat manage thread %d\n", (int)pool->manage_thread_id);

    /* create the threadpool mutexs and cond vars */
    if((rtn = pthread_mutex_init(&(pool->tp_lock),NULL)) != 0) {
        printf("pthread_mutex_init %s",strerror(rtn));
        return NULL;
    }

    if((rtn = pthread_cond_init(&(pool->tp_cond),NULL)) != 0) {
        printf("pthread_cond_init %s",strerror(rtn));
        return NULL;
    }
    /* initialize the shutdown*/

    pool->shutdown = 0;

    /* create an array to hold a ptr to the worker threads */
    if((pool->threads_info = (thread_t *)malloc(sizeof(thread_t)
                    *max_threads)) == NULL)
    {
        printf("Unable to malloc() thread info array\n";
        return NULL;
    }

    /* create the individual worker threads */
                for(i=0;i<pool->cur_threads;i++){
                        pthread_cond_init(&pool->threads_info.thread_cond, NULL);
                        pthread_mutex_init(&pool->threads_info.thread_lock, NULL);
                        pool->threads_info.is_busy = 0;

                        rtn = pthread_create(&pool->threads_info.thread_id, NULL, threadpool_thread, (void*)pool);
                        if(0 != rtn){
                                printf("tp_init: creat work thread failed\n";
                                return FALSE;
                        }
                        printf("tp_init: creat work thread %d\n", (int)pool->threads_info.thread_id);
                }
               
    return pool;
}

int threadpool_dispatch(threadpool_t *pool, void (*routine)(void *), void *arg)
{
        int i;
        thread_work_t *workp;

        if (pool->shutdown){
                printf("threadpool_dispatch: the threadpool shutdown now!\n";
                return -1;
        }

        /* control max threads */
         if (pool->cur_threads >= pool->max_threads){
                 printf("threadpool_dispatch: threadpool busy,dispatch failed\n";
         }

        /*
         *        while (pool->cur_threads == pool->max_threads){
         *                if((rtn = pthread_cond_wait(&pool->tp_cond,
         *                                        &pool->tp_lock) ) != 0) {
         *                printf("threadpool_dispatch: pthread cond wait failure\n";
         *                return -1;
         *                }
         *        }
         */

        /* allocate the work structure */
        if((workp = (thread_work_t *)malloc(sizeof(thread_work_t))) == NULL) {
                printf("unable to create work struct\n";
                return -1;
        }

        workp->handler_routine = routine;
        workp->arg = arg;
        workp->next = NULL;

        for(i=0;i<pool->cur_threads;i++){
                pthread_mutex_lock(&pool->threads_info.thread_lock);
                if(!pool->threads_info.is_busy){
                        pool->threads_info.is_busy = 1;

                        pool->threads_info.th_work = workp;

                        printf("threadpool_dispatch: informing idle working thread %d, thread id is %d\n", i, (int)pool->threads_info.thread_id);
                        pthread_mutex_unlock(&pool->threads_info.thread_lock);
                        pthread_cond_signal(&pool->threads_info.thread_cond);

                        return 0;
                } else {
                        pthread_mutex_unlock(&pool->threads_info.thread_lock);               
                }
        }

        /* there is no idle threads, let's create one */
        pthread_mutex_lock(&pool->tp_lock);
        if( add_thread(pool) ){
                i = pool->cur_threads - 1;
                pool->threads_info.th_work = workp;
        }
        pthread_mutex_unlock(&pool->tp_lock);

        printf("threadpool_dispatch: informing idle working thread %d, thread id is %d\n", i, (int)pool->threads_info.thread_id);
        pthread_cond_signal(&pool->threads_info.thread_cond);

        return 0;
}

int threadpool_destroy(threadpool_t *pool)
{
        int i, rtn;

        /* set the shutdown flag */
        pool->shutdown = 1;

        pthread_join(pool->manage_thread_id,NULL);
        kill(pool->manage_thread_id, SIGKILL);
        printf("threadpool_destory: kill manage thread %d\n", (int)pool->manage_thread_id);
        /* wait for workers to exit */
        for(i = 0; i < pool->cur_threads; i++) {
                if((rtn = pthread_cond_broadcast(&pool->threads_info.thread_cond)) != 0) {
                        printf("threadpool_destory: pthread_cond_broadcast %d\n",rtn);
                        return -1;
                }

                if((rtn = pthread_join(pool->threads_info.thread_id,NULL)) != 0) {
                        printf("threadpool_destory: pthread_join %d\n",rtn);
                        return -1;
                }
        }

        for(i=0;i<pool->cur_threads;i++){
                kill(pool->threads_info.thread_id, SIGKILL);
                pthread_mutex_destroy(&pool->threads_info.thread_lock);
                pthread_cond_destroy(&pool->threads_info.thread_cond);
                printf("threadpool_destory: kill work thread %d\n", (int)pool->threads_info.thread_id);
        }

        pthread_mutex_destroy(&pool->tp_lock);
        pthread_cond_destroy(&pool->tp_cond);
        /* clean up memory */
        free(pool->threads_info);
        free(pool);

        return 0;
}

void *threadpool_thread(void *tpool)
{
        thread_work_t *my_work = NULL;
        threadpool_t *pool = (struct threadpool *)tpool;
        pthread_t curid;
        int nseq;

        curid = pthread_self();

        nseq = get_thread_by_id(pool, curid);
        if(nseq < 0)
                return NULL;
        printf("entering working thread %d, thread id is %d\n", nseq, (int)curid);

        for(; /* go forever */
        {
                pthread_mutex_lock(&pool->threads_info[nseq].thread_lock);
                pthread_cond_wait(&pool->threads_info[nseq].thread_cond, &pool->threads_info[nseq].thread_lock);
                pthread_mutex_unlock(&pool->threads_info[nseq].thread_lock);               
                if (pool->shutdown == 1)
                        break;

                printf("%d thread do work!\n", (int)pthread_self());
                my_work = pool->threads_info[nseq].th_work;

                /* perform the work */
                (*(my_work->handler_routine))(my_work->arg);
                free(my_work);

                pthread_mutex_lock(&pool->threads_info[nseq].thread_lock);               
                pool->threads_info[nseq].th_work = NULL;
                pool->threads_info[nseq].is_busy = 0;
                pthread_mutex_unlock(&pool->threads_info[nseq].thread_lock);

                printf("%d do work over\n", (int)pthread_self());

        }
        return(NULL);
}

static int add_thread(threadpool_t *pool){
        int rtn;
        thread_t *new_thread;

        if( pool->max_threads <= pool->cur_threads )
                return FALSE;

        new_thread = &pool->threads_info[pool->cur_threads];

        pthread_cond_init(&new_thread->thread_cond, NULL);
        pthread_mutex_init(&new_thread->thread_lock, NULL);

        new_thread->is_busy = 1;

        pool->cur_threads++;

        rtn = pthread_create(&new_thread->thread_id, NULL, threadpool_thread, pool);
        if(0 != rtn){
                free(new_thread);
                return FALSE;
        }
        printf("add_thread: creat work thread %d\n", (int)pool->threads_info[pool->cur_threads-1].thread_id);

        return TRUE;
}

static int delete_thread(threadpool_t *pool){
        if(pool->cur_threads <= pool->min_threads) return FALSE;

        if(pool->threads_info[pool->cur_threads-1].is_busy) return FALSE;

        kill(pool->threads_info[pool->cur_threads-1].thread_id, SIGKILL);
        pthread_mutex_destroy(&pool->threads_info[pool->cur_threads-1].thread_lock);
        pthread_cond_destroy(&pool->threads_info[pool->cur_threads-1].thread_cond);
        printf("delete_thread: delete idle thread %d\n", (int)pool->threads_info[pool->cur_threads-1].thread_id);

        pool->cur_threads--;

        return TRUE;
}

int get_thread_by_id(threadpool_t *pool, int id){
        int i;

        for(i=0;i<pool->cur_threads;i++){
                if(id == pool->threads_info.thread_id)
                        return i;
        }

        return -1;
}

static int  get_tp_status(threadpool_t *pool){
        float busy_num = 0.0;
        int i;

        for(i=0;i<pool->cur_threads;i++){
                if(pool->threads_info.is_busy)
                        busy_num++;
        }

        if(busy_num/(pool->cur_threads) < BUSY_THRESHOLD)
                return 0;
        else
                return 1;       
}

static void *tp_manage_thread(void *threadpool){
        threadpool_t *pool = (threadpool_t *)threadpool;

        sleep(MANAGE_INTERVAL);

        do{
                if( get_tp_status(pool) == 0 ){
                        do{
                                if( !delete_thread(pool) )
                                        break;
                        }while(TRUE);
                }

                sleep(MANAGE_INTERVAL);
        }while(TRUE);
}

void print_tp_status(threadpool_t *pool){
        float busy_num = 0.0;
        int i;

        for(i=0;i<pool->cur_threads;i++){
                if(pool->threads_info.is_busy)
                        busy_num++;
        }

        printf("Thread pool status: total threads %d in pool; %d threads busy; usage rate is %f .\n",(int)pool->cur_threads,(int)busy_num,busy_num/(pool->cur_threads));
        return ;       
}

论坛徽章:
0
3 [报告]
发表于 2007-12-18 12:57 |只看该作者
3, testpool.c

#include        <stdio.h>
#include        <stdlib.h>
#include        <string.h>
#include        <signal.h>
#include        <unistd.h>
#include         "threadpool.h"

void dowork(void *arg)
{
        int ptr=(int)arg;

        sleep(1);
        printf("hello world! %d\n",ptr);
}

int main(int argc, char *argv[])
{
        int i;
        threadpool_t *pool;

        pool=threadpool_init(10,100);

        for(i=1;i<50;i++)
        {
                threadpool_dispatch(pool,dowork,(void *) i);
                print_tp_status(pool);
        }
        sleep(3);
        print_tp_status(pool);
        threadpool_destroy(pool);

        return 0;
}

论坛徽章:
0
4 [报告]
发表于 2007-12-18 12:58 |只看该作者
4, Makefile

#--------------------------------------------------------------------

#--------------------------------------------------------------------

CC = gcc
CPP = g++
AR = ar cru
CFLAGS = -Wall -D_REENTRANT -D_GNU_SOURCE -g
SOFLAGS = -shared -fPIC
LDFLAGS = -lpthread

LINKER = $(CC)
LINT = lint -c
RM = /bin/rm -f

#--------------------------------------------------------------------
LIBOBJS = threadpool.o

TARGET = testpool

#--------------------------------------------------------------------

all: $(TARGET)

testpool: threadpool.o testpool.o
        $(LINKER) $(LDFLAGS) $^ $(LIBS) -o $@

clean:
        $(RM) *.o core core.* $(TARGET)

#--------------------------------------------------------------------

# make rule
%.o : %.c
        $(CC) $(CFLAGS) -c $^ -o $@

论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
5 [报告]
发表于 2007-12-18 12:59 |只看该作者
原帖由 sysit 于 2007-12-18 12:56 发表
新手忽悠的一段C代码,东抄一点,西抄一点. 实现完了之后发现有bug.

论坛徽章:
0
6 [报告]
发表于 2007-12-18 13:00 |只看该作者
threadpool.tar.gz (2.93 KB, 下载次数: 42)
附件打包传上来了.

现在运行的情况是, testpool执行后僵死,退不出来. pstack看大多数线程都是挂起的.

请高手指教,谢谢

论坛徽章:
0
7 [报告]
发表于 2007-12-18 13:03 |只看该作者
B4 flw, 新手发东西应该鼓励,不应该打击.

偶又不是真正的程序员,只不过手头缺工具用,所有想整点东西用用. 不抄搞不定的.

论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
8 [报告]
发表于 2007-12-18 13:10 |只看该作者
原帖由 sysit 于 2007-12-18 13:03 发表
B4 flw, 新手发东西应该鼓励,不应该打击.

偶又不是真正的程序员,只不过手头缺工具用,所有想整点东西用用. 不抄搞不定的.

抄了还是没搞定。
我的意思是说,你应该拿出你薪水的一半去找个兼职来作,这样你还可以白赚 50%。

论坛徽章:
0
9 [报告]
发表于 2007-12-18 13:46 |只看该作者
版主, 给点建议吗, 不要只记得打击啊, 呵呵.

论坛徽章:
0
10 [报告]
发表于 2007-12-18 13:52 |只看该作者
这个原型是htun里面的threadpool, 精华区有人发过那段C代码. 但这个pool池大小不是动态调整的. 我参考了一些C++的实现,去掉了它的queue, 改用动态调整pool大小. 但就搞不定线程被分配出去执行任务后回收很慢的问题.
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP