- 论坛徽章:
- 0
|
本帖最后由 shtr 于 2010-05-12 11:40 编辑
小弟才学C++不久,为练手写的线程池,还望各位大牛不吝赐教,多多拍砖。
threadpool.h:- /**
- * @file threadpool.h
- * @brief 本文件用来实现一个不依赖第三方库与具体操作系统的线程池
- *
- * detail...
- *
- * @author shtr
- * @version 1.0
- * @date 2009年12月6日
- *
- * @see
- *
- * @par 版本记录:
- * <table border=1>
- * <tr> <th>版本 <th>日期 <th>作者 <th>备注 </tr>
- * <tr> <td>1.0 <td>2009年12月6日 <td>shtr <td>创建 </tr>
- * </table>
- */
- #ifndef _THREAD_POOL_
- #define _THREAD_POOL_
- #ifdef WIN32
- # include <windows.h>
- # include <process.h>
- #else
- # include <pthread.h>
- # include <semaphore.h>
- #endif /* WIN32 */
- #include <vector>
- #include <queue>
- class lock_i
- {
- public:
- virtual bool lock(void) = 0;
- virtual void unlock(void) = 0;
- };
- /**
- * @class class mutex_lock
- *
- * @brief 使用单值信号量封装的互斥锁
- *
- * detail...
- *
- * @author shtr
- * @date 2009年12月6日
- *
- * @see
- *
- * @par 备注:
- *
- */
- class mutex_lock
- : public lock_i
- {
- public:
- mutex_lock(void)
- {
- #ifdef WIN32
- lock_ = CreateSemaphore(0, 1, 1, 0);
- #else
- sem_init(&lock_, 0, 1);
- #endif
- }
- ~mutex_lock(void)
- {
- #ifdef WIN32
- if ( lock_ )
- CloseHandle(lock_), lock_ = 0;
- #else
- sem_destroy(&lock_), lock_ = 0;
- #endif
- }
- void unlock(void)
- {
- #ifdef WIN32
- ReleaseSemaphore(lock_, 1, 0);
- #else
- sem_post(&lock_);
- #endif
- }
- bool lock(void)
- {
- #ifdef WIN32
- DWORD ret = WaitForSingleObject(lock_, INFINITE);
- return ret == WAIT_OBJECT_0;
- #else
- return sem_wait(&lock_) == 0;
- #endif /* WIN32 */
- }
- private:
- #ifdef WIN32
- HANDLE lock_;
- #else
- sem_t lock_;
- #endif /* WIN32 */
- };
- /**
- * @class class scope_lock
- *
- * @brief 域锁
- *
- * 构造时请求锁,析构时释放锁
- *
- * @author shtr
- * @date 2009年12月6日
- *
- * @see
- *
- * @par 备注:
- *
- */
- class scope_lock
- : public lock_i
- {
- private:
- lock_i& lock_;
- public:
- scope_lock(lock_i& lock)
- : lock_(lock)
- {
- lock_.lock();
- }
- ~scope_lock()
- {
- lock_.unlock();
- }
- bool lock(void)
- {
- return lock_.lock();
- }
- void unlock(void)
- {
- return lock_.unlock();
- }
- };
- /**
- * @class class thr_sem
- *
- * @brief 同步信号量
- *
- * detail...
- *
- * @author shtr
- * @date 2009年12月6日
- *
- * @see
- *
- * @par 备注:
- *
- */
- class thr_sem
- {
- public:
- thr_sem(void)
- {
- #ifdef WIN32
- sem_ = CreateSemaphore(0, 0, INT_MAX, 0);
- #else
- sem_init(&sem_, 0, 0);
- #endif
- }
- ~thr_sem(void)
- {
- #ifdef WIN32
- if ( sem_ )
- CloseHandle(sem_), sem_ = 0;
- #else
- sem_destroy(&sem_), sem_ = 0;
- #endif
- }
- void notify_one(void)
- {
- #ifdef WIN32
- ReleaseSemaphore(sem_, 1, 0);
- #else
- sem_post(&sem_);
- #endif /* WIN32 */
- }
- bool timed_wait(int msec)
- {
- #ifdef WIN32
- DWORD ret = WaitForSingleObject(sem_, msec);
- return ret == WAIT_OBJECT_0;
- #else
- timespec tv;
- tv.tv_sec = time(NULL) + msec / 1000;
- tv.tv_nsec = 0;
- return sem_timedwait(&sem_, &tv) == 0;
- #endif /* WIN32 */
- }
- bool wait(void)
- {
- #ifdef WIN32
- DWORD ret = WaitForSingleObject(sem_, INFINITE);
- return ret == WAIT_OBJECT_0;
- #else
- int ret = sem_wait(&sem_);
- if ( ret != 0)
- return errno;
- return ret;
- #endif
- }
- private:
- #ifdef WIN32
- HANDLE sem_;
- #else
- sem_t sem_;
- #endif /* WIN32 */
- };
- class thr_ic
- {
- public:
- virtual void svc(void) = 0;
- };
- /**
- * @class class thr_impl
- *
- * @brief 线程实现接口,操作系统相关
- *
- * detail...
- *
- * @author shtr
- * @date 2009年12月6日
- *
- * @see
- *
- * @par 备注:
- *
- */
- class thr_impl
- {
- public:
- virtual int start(thr_ic* thr_unit)
- {
- int thr_id = create_thread(thr_unit);
- return thr_id;
- }
- virtual unsigned int create_thread(thr_ic* thr_unit) = 0;
- virtual int join(unsigned int tid) = 0;
- };
- #ifdef WIN32
- class thr_impl_win : public thr_impl
- {
- public:
- virtual unsigned int create_thread(thr_ic* thr_unit)
- {
- return(unsigned int)_beginthread(svc, 0, thr_unit);
- }
- virtual int join(unsigned int tid)
- {
- HANDLE hThread = (HANDLE) tid;
- return WaitForSingleObject( hThread, INFINITE );
- }
- private:
- static void svc(void* args)
- {
- thr_ic* unit = (thr_ic *)args;
- unit->svc();
- }
- };
- typedef thr_impl_win thr_impl_os;
- #else
- class thr_impl_nix : public thr_impl
- {
- public:
- virtual unsigned int create_thread(thr_ic* thr_unit)
- {
- pthread_t tid;
- int ret = pthread_create(&tid, NULL, svc, thr_unit);
- if (ret != 0)
- return errno;
- return tid;
- }
- virtual int join(unsigned int tid)
- {
- int ret = pthread_join(tid, NULL);
- if (ret != 0)
- return errno;
- return ret;
- }
- private:
- static void* svc(void* args)
- {
- thr_ic* unit = (thr_ic *)args;
- unit->svc();
- return NULL;
- }
- };
- typedef thr_impl_nix thr_impl_os;
- #endif /* WIN32 */
- /**
- * @class class threadpool
- *
- * @brief 线程池管理类
- *
- * detail...
- *
- * @author shtr
- * @date 2009年12月6日
- *
- * @see
- *
- * @par 备注:
- *
- */
- template<class thr_impl = thr_impl_os, class sem_t = thr_sem>
- class threadpool
- : public thr_ic
- {
- public:
- typedef void (*TaskFunc)(void *);
- typedef void* Arg;
- class Task
- {
- public:
- Task(TaskFunc func, Arg args)
- : func_(func)
- , args_(args)
- { }
- public:
- TaskFunc func_;
- Arg args_;
- };
- typedef std::queue<Task> TaskQueue;
- TaskQueue tasks_; // 任务队列
- bool thr_stop_; // 终止标识
- public:
- /* 工作线程 */
- class worker
- : public thr_ic
- {
- private:
- threadpool* pool_;
- public:
- worker(threadpool* pool = NULL)
- : pool_(pool)
- {
- }
- void bind_pool(threadpool* pool)
- {
- pool_ = pool;
- }
- virtual void svc(void)
- {
- while (!pool_->thr_stop_)
- {
- if (!pool_->sem_.timed_wait(200))
- continue;
- pool_->lock_.lock();
- Task task = pool_->tasks_.front();
- pool_->tasks_.pop();
- pool_->lock_.unlock();
- (*task.func_)(task.args_);
- }
- }
- };
- worker worker_; // 工作线程
- public:
- threadpool()
- : thrs_count_(0)
- , thrs_id_(NULL)
- , thr_stop_(false)
- { }
- virtual void add_task(Task task)
- {
- lock_.lock();
- tasks_.push(task);
- lock_.unlock();
- sem_.notify_one();
- }
- virtual int open(unsigned int thrs_count)
- {
- // 创建线程
- thrs_count_ = thrs_count;
- worker_.bind_pool(this);
- this->start();
- return 0;
- }
- void start(void)
- {
- for (unsigned int i = 0; i < thrs_count_; i ++)
- {
- int thr_id = thr_impl_.start(&worker_);
- if ( thr_id > 0 )
- {
- thrs_id_.push_back(thr_id);
- }
- }
- }
- void stop(void)
- {
- thr_stop_ = true;
- join_all();
- }
- void svc(void)
- { }
- private:
- void join_all(void)
- {
- for (std::vector<int>::const_iterator cit = thrs_id_.begin(); cit != thrs_id_.end(); cit++)
- {
- thr_impl_.join(*cit);
- }
- }
- private:
- sem_t sem_; // 同步信号量
- mutex_lock lock_; // 互斥锁
- thr_impl thr_impl_; // 线程
- unsigned int thrs_count_; // 线程计数
- std::vector<int> thrs_id_;
- };
- #endif /* _THREAD_POOL_ */
复制代码 测试:- #include "threadpool.h"
- void func(void* args)
- {
- printf("I'm %s.\n", (char *)args);
- }
- int _tmain(int argc, _TCHAR* argv[])
- {
-
- threadpool<> pool;
-
- pool.add_task(threadpool<>::Task(func, "1"));
- pool.add_task(threadpool<>::Task(func, "2"));
- pool.add_task(threadpool<>::Task(func, "3"));
- pool.add_task(threadpool<>::Task(func, "4"));
- pool.add_task(threadpool<>::Task(func, "5"));
- pool.add_task(threadpool<>::Task(func, "6"));
- pool.add_task(threadpool<>::Task(func, "7"));
-
- pool.open(20);
- Sleep(2000);
- pool.add_task(threadpool<>::Task(func, "40"));
- pool.add_task(threadpool<>::Task(func, "50"));
- pool.add_task(threadpool<>::Task(func, "60"));
- pool.add_task(threadpool<>::Task(func, "70"));
-
- Sleep(2000);
- pool.stop();
-
- return 0;
- }
复制代码 |
|