免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 3472 | 回复: 4
打印 上一主题 下一主题

[C++] 初学C++,发个线程池 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2010-05-12 11:32 |只看该作者 |倒序浏览
本帖最后由 shtr 于 2010-05-12 11:40 编辑

小弟才学C++不久,为练手写的线程池,还望各位大牛不吝赐教,多多拍砖。
threadpool.h:
  1. /**
  2. * @file    threadpool.h
  3. * @brief    本文件用来实现一个不依赖第三方库与具体操作系统的线程池
  4. *
  5. * detail...
  6. *
  7. * @author    shtr
  8. * @version    1.0
  9. * @date    2009年12月6日
  10. *
  11. * @see        
  12. *
  13. * @par 版本记录:
  14. * <table border=1>
  15. *  <tr> <th>版本    <th>日期            <th>作者    <th>备注 </tr>
  16. *  <tr> <td>1.0    <td>2009年12月6日    <td>shtr    <td>创建 </tr>
  17. * </table>
  18. */
  19. #ifndef    _THREAD_POOL_
  20. #define    _THREAD_POOL_
  21. #ifdef WIN32
  22. #    include <windows.h>
  23. #    include <process.h>
  24. #else
  25. #    include <pthread.h>
  26. #    include <semaphore.h>
  27. #endif /* WIN32 */

  28. #include <vector>
  29. #include <queue>


  30. class lock_i
  31. {
  32. public:
  33.     virtual bool lock(void) = 0;
  34.     virtual void unlock(void) = 0;
  35. };

  36. /**
  37. * @class    class mutex_lock
  38. *
  39. * @brief    使用单值信号量封装的互斥锁
  40. *
  41. * detail...
  42. *
  43. * @author    shtr
  44. * @date    2009年12月6日
  45. *
  46. * @see        
  47. *
  48. * @par 备注:
  49. *
  50. */
  51. class mutex_lock
  52.     : public lock_i
  53. {
  54. public:
  55.     mutex_lock(void)
  56.     {
  57. #ifdef WIN32
  58.         lock_ = CreateSemaphore(0, 1, 1, 0);

  59. #else
  60.         sem_init(&lock_, 0, 1);
  61. #endif
  62.     }
  63.     ~mutex_lock(void)
  64.     {
  65. #ifdef WIN32
  66.         if ( lock_ )
  67.             CloseHandle(lock_), lock_ = 0;
  68. #else
  69.         sem_destroy(&lock_), lock_ = 0;
  70. #endif
  71.     }

  72.     void unlock(void)
  73.     {
  74. #ifdef WIN32
  75.         ReleaseSemaphore(lock_, 1, 0);
  76. #else
  77.         sem_post(&lock_);
  78. #endif
  79.     }

  80.     bool lock(void)
  81.     {
  82. #ifdef WIN32
  83.         DWORD ret = WaitForSingleObject(lock_, INFINITE);
  84.         return ret == WAIT_OBJECT_0;
  85. #else
  86.         return sem_wait(&lock_) == 0;
  87. #endif /* WIN32 */
  88.     }
  89. private:
  90. #ifdef WIN32
  91.     HANDLE                lock_;
  92. #else
  93.     sem_t               lock_;
  94. #endif /* WIN32 */
  95. };

  96. /**
  97. * @class    class scope_lock
  98. *
  99. * @brief    域锁
  100. *
  101. * 构造时请求锁,析构时释放锁
  102. *
  103. * @author    shtr
  104. * @date    2009年12月6日
  105. *
  106. * @see        
  107. *
  108. * @par 备注:
  109. *
  110. */
  111. class scope_lock
  112.     : public lock_i
  113. {
  114. private:
  115.     lock_i& lock_;

  116. public:
  117.     scope_lock(lock_i& lock)
  118.         : lock_(lock)
  119.     {

  120.         lock_.lock();
  121.     }
  122.     ~scope_lock()
  123.     {
  124.         lock_.unlock();
  125.     }

  126.     bool lock(void)
  127.     {
  128.         return lock_.lock();
  129.     }

  130.     void unlock(void)
  131.     {
  132.         return lock_.unlock();
  133.     }
  134. };

  135. /**
  136. * @class    class thr_sem
  137. *
  138. * @brief    同步信号量
  139. *
  140. * detail...
  141. *
  142. * @author    shtr
  143. * @date    2009年12月6日
  144. *
  145. * @see        
  146. *
  147. * @par 备注:
  148. *
  149. */
  150. class thr_sem
  151. {
  152. public:
  153.     thr_sem(void)
  154.     {
  155. #ifdef WIN32
  156.         sem_ = CreateSemaphore(0, 0, INT_MAX, 0);
  157. #else
  158.         sem_init(&sem_, 0, 0);
  159. #endif
  160.     }
  161.     ~thr_sem(void)
  162.     {
  163. #ifdef WIN32
  164.         if ( sem_ )
  165.             CloseHandle(sem_), sem_ = 0;
  166. #else
  167.         sem_destroy(&sem_), sem_ = 0;
  168. #endif
  169.     }

  170.     void notify_one(void)
  171.     {
  172. #ifdef WIN32
  173.         ReleaseSemaphore(sem_, 1, 0);
  174. #else
  175.         sem_post(&sem_);
  176. #endif /* WIN32 */
  177.     }

  178.     bool timed_wait(int msec)
  179.     {
  180. #ifdef WIN32
  181.         DWORD ret = WaitForSingleObject(sem_, msec);
  182.         return ret == WAIT_OBJECT_0;
  183. #else
  184.         timespec tv;
  185.         tv.tv_sec = time(NULL) + msec / 1000;
  186.         tv.tv_nsec = 0;
  187.         return sem_timedwait(&sem_, &tv) == 0;
  188. #endif /* WIN32 */
  189.     }

  190.     bool wait(void)
  191.     {
  192. #ifdef WIN32
  193.         DWORD ret = WaitForSingleObject(sem_, INFINITE);
  194.         return ret == WAIT_OBJECT_0;
  195. #else
  196.         int ret = sem_wait(&sem_);
  197.         if ( ret != 0)
  198.             return errno;
  199.         return ret;
  200. #endif
  201.     }

  202. private:
  203. #ifdef WIN32
  204.     HANDLE                sem_;
  205. #else
  206.     sem_t               sem_;
  207. #endif /* WIN32 */
  208. };


  209. class thr_ic
  210. {
  211. public:
  212.     virtual void svc(void) = 0;
  213. };

  214. /**
  215. * @class    class thr_impl
  216. *
  217. * @brief    线程实现接口,操作系统相关
  218. *
  219. * detail...
  220. *
  221. * @author    shtr
  222. * @date    2009年12月6日
  223. *
  224. * @see        
  225. *
  226. * @par 备注:
  227. *
  228. */
  229. class thr_impl
  230. {

  231. public:
  232.     virtual int start(thr_ic* thr_unit)
  233.     {
  234.         int thr_id = create_thread(thr_unit);

  235.         return thr_id;
  236.     }

  237.     virtual unsigned int create_thread(thr_ic* thr_unit) = 0;

  238.     virtual int join(unsigned int tid) = 0;

  239. };
  240. #ifdef WIN32
  241. class thr_impl_win : public thr_impl
  242. {
  243. public:
  244.     virtual unsigned int create_thread(thr_ic* thr_unit)
  245.     {
  246.         return(unsigned int)_beginthread(svc, 0, thr_unit);
  247.     }
  248.     virtual int join(unsigned int tid)
  249.     {
  250.         HANDLE hThread = (HANDLE) tid;
  251.         return WaitForSingleObject( hThread, INFINITE );
  252.     }
  253. private:
  254.     static void svc(void* args)
  255.     {
  256.         thr_ic* unit = (thr_ic *)args;
  257.         unit->svc();
  258.     }

  259. };

  260. typedef thr_impl_win thr_impl_os;
  261. #else
  262. class thr_impl_nix : public thr_impl
  263. {
  264. public:
  265.     virtual unsigned int create_thread(thr_ic* thr_unit)
  266.     {
  267.         pthread_t tid;
  268.         int ret = pthread_create(&tid, NULL, svc, thr_unit);
  269.         if (ret != 0)
  270.             return errno;
  271.         return tid;
  272.     }
  273.     virtual int join(unsigned int tid)
  274.     {
  275.         int ret = pthread_join(tid, NULL);
  276.         if (ret != 0)
  277.             return errno;
  278.         return ret;
  279.     }
  280. private:
  281.     static void* svc(void* args)
  282.     {
  283.         thr_ic* unit = (thr_ic *)args;
  284.         unit->svc();
  285.         return NULL;
  286.     }

  287. };

  288. typedef thr_impl_nix thr_impl_os;

  289. #endif /* WIN32 */



  290. /**
  291. * @class    class threadpool
  292. *
  293. * @brief    线程池管理类
  294. *
  295. * detail...
  296. *
  297. * @author    shtr
  298. * @date    2009年12月6日
  299. *
  300. * @see        
  301. *
  302. * @par 备注:
  303. *
  304. */
  305. template<class thr_impl = thr_impl_os, class sem_t = thr_sem>
  306. class threadpool
  307.     : public thr_ic
  308. {

  309. public:
  310.     typedef void (*TaskFunc)(void *);
  311.     typedef void* Arg;
  312.     class Task
  313.     {
  314.     public:
  315.         Task(TaskFunc func, Arg args)
  316.             : func_(func)
  317.             , args_(args)
  318.         {    }
  319.     public:
  320.         TaskFunc    func_;
  321.         Arg            args_;
  322.     };
  323.     typedef std::queue<Task> TaskQueue;

  324.     TaskQueue        tasks_;            // 任务队列
  325.     bool            thr_stop_;        // 终止标识

  326. public:
  327.     /* 工作线程 */
  328.     class worker
  329.         : public thr_ic
  330.     {
  331.     private:
  332.         threadpool* pool_;
  333.     public:
  334.         worker(threadpool* pool = NULL)
  335.             : pool_(pool)
  336.         {
  337.         }
  338.         void bind_pool(threadpool* pool)
  339.         {
  340.             pool_ = pool;
  341.         }

  342.         virtual void svc(void)
  343.         {
  344.             while (!pool_->thr_stop_)
  345.             {

  346.                 if (!pool_->sem_.timed_wait(200))
  347.                     continue;

  348.                 pool_->lock_.lock();
  349.                 Task task = pool_->tasks_.front();
  350.                 pool_->tasks_.pop();
  351.                 pool_->lock_.unlock();

  352.                 (*task.func_)(task.args_);
  353.             }
  354.         }
  355.     };

  356.     worker                worker_;    // 工作线程

  357. public:
  358.     threadpool()
  359.         : thrs_count_(0)
  360.         , thrs_id_(NULL)
  361.         , thr_stop_(false)
  362.     {    }

  363.     virtual void add_task(Task task)
  364.     {

  365.         lock_.lock();
  366.         tasks_.push(task);
  367.         lock_.unlock();
  368.         sem_.notify_one();
  369.     }

  370.     virtual int open(unsigned int thrs_count)
  371.     {
  372.         // 创建线程
  373.         thrs_count_ = thrs_count;

  374.         worker_.bind_pool(this);
  375.         this->start();

  376.         return 0;

  377.     }

  378.     void start(void)
  379.     {

  380.         for (unsigned int i = 0; i < thrs_count_; i ++)
  381.         {
  382.             int thr_id = thr_impl_.start(&worker_);

  383.             if ( thr_id > 0 )
  384.             {
  385.                 thrs_id_.push_back(thr_id);
  386.             }
  387.         }
  388.     }

  389.     void stop(void)
  390.     {
  391.         thr_stop_ = true;

  392.         join_all();
  393.     }


  394.     void svc(void)
  395.     {    }

  396. private:

  397.     void join_all(void)
  398.     {
  399.         for (std::vector<int>::const_iterator cit = thrs_id_.begin(); cit != thrs_id_.end(); cit++)
  400.         {
  401.             thr_impl_.join(*cit);
  402.         }
  403.     }

  404. private:
  405.     sem_t                sem_;            // 同步信号量
  406.     mutex_lock            lock_;            // 互斥锁
  407.     thr_impl            thr_impl_;        // 线程
  408.     unsigned int        thrs_count_;    // 线程计数
  409.     std::vector<int>    thrs_id_;

  410. };

  411. #endif /* _THREAD_POOL_ */
复制代码
测试:
  1. #include "threadpool.h"

  2. void func(void* args)
  3. {
  4.         printf("I'm %s.\n", (char *)args);
  5. }

  6. int _tmain(int argc, _TCHAR* argv[])
  7. {
  8.        
  9.         threadpool<> pool;
  10.    
  11.         pool.add_task(threadpool<>::Task(func, "1"));
  12.         pool.add_task(threadpool<>::Task(func, "2"));
  13.         pool.add_task(threadpool<>::Task(func, "3"));
  14.         pool.add_task(threadpool<>::Task(func, "4"));
  15.         pool.add_task(threadpool<>::Task(func, "5"));
  16.         pool.add_task(threadpool<>::Task(func, "6"));
  17.         pool.add_task(threadpool<>::Task(func, "7"));
  18.        
  19.     pool.open(20);

  20.         Sleep(2000);

  21.         pool.add_task(threadpool<>::Task(func, "40"));
  22.         pool.add_task(threadpool<>::Task(func, "50"));
  23.         pool.add_task(threadpool<>::Task(func, "60"));
  24.         pool.add_task(threadpool<>::Task(func, "70"));
  25.        
  26.         Sleep(2000);

  27.         pool.stop();

  28.        

  29.         return 0;
  30. }
复制代码

论坛徽章:
1
黑曼巴
日期:2020-02-27 22:54:26
2 [报告]
发表于 2010-05-12 11:52 |只看该作者
提示: 作者被禁止或删除 内容自动屏蔽

论坛徽章:
0
3 [报告]
发表于 2010-05-12 12:45 |只看该作者
楼主有java的编程风格
c/unix 发表于 2010-05-12 11:52



    哪里?

论坛徽章:
0
4 [报告]
发表于 2010-05-12 15:32 |只看该作者
来支持一下,学习一下!!

论坛徽章:
0
5 [报告]
发表于 2010-05-12 16:53 |只看该作者
给楼主一点建议:
threadpool::Task这个类不够通用(原因是:typedef void (*TaskFunc)(void *)),这样仅支持C风格的带一个参数的函数,而把C++中的仿函数以及类成员函数排除在外了,这无疑是很大的一个硬伤,如需改进,参考《C++设计新思维》第5章“泛化仿函数”。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP