免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 4839 | 回复: 5
打印 上一主题 下一主题

[C++] 写了一个c++线程安全的队列,各位老大帮忙参考一下 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2006-11-29 11:12 |只看该作者 |倒序浏览
昨天求教了半天没人响应,自己弄了一个,具体有什么不足之处,各位帮忙参考一下:




  1. #ifdef WIN32
  2. #include <winbase.h>
  3. #else
  4. #include <pthread.h>
  5. #include <unistd.h>
  6. #endif

  7. #include <errno.h>
  8. #include <time.h>
  9. #include <sys/types.h>
  10. #include <queue>
  11. using namespace std;

  12. //这个是线程安全的队列,Synchronized类的实现在下面
  13. template <class Type>
  14. class SyncQueue:public Synchronized{
  15. public:
  16.         SyncQueue();
  17.         ~SyncQueue();
  18.         void push(const Type &);
  19.         Type pop();
  20. private:
  21.         queue<Type> *p_Queue;
  22. };


  23. template <class Type>
  24. SyncQueue<Type>::SyncQueue(){
  25.         p_Queue = new queue<Type>() ;
  26. }

  27. template <class Type>
  28. SyncQueue<Type>::~SyncQueue(){
  29.         delete p_Queue;
  30. }

  31. template <class Type>
  32. Type SyncQueue<Type>::pop(){
  33.         Type type;
  34.         lock();
  35.         while(p_Queue->empty()){
  36.                 try
  37.                 {
  38.                         wait();
  39.                 }
  40.                 catch(exception& ex)
  41.                 {
  42.                         cout<< ex.what()<<endl;
  43.                         throw;
  44.                 }               
  45.         }
  46.         type=p_Queue->front();
  47.         p_Queue->pop();
  48.         unlock();
  49.         return type;
  50. }

  51. template <class Type>
  52. void SyncQueue<Type>::push(const Type &type){
  53.         lock();
  54.         p_Queue->push(type);
  55.         try
  56.         {
  57.                 notify();
  58.         }
  59.         catch(exception& ex)
  60.         {
  61.                 cout<<ex.what()<<endl;
  62.                 throw;
  63.         }               
  64.         unlock();
  65. }

  66. //Synchronized类的实现,仿java的一些方法,我参考了别人的代码

  67. class Synchronized
  68. {
  69. public:
  70.         Synchronized();
  71.         ~Synchronized();


  72.         // Causes current thread to wait until another thread
  73.         // invokes the notify() method or the notifyAll()
  74.         // method for this object.
  75.         void                wait();


  76.         // Causes current thread to wait until either another
  77.         // thread invokes the notify() method or the notifyAll()
  78.         // method for this object, or a specified amount of time
  79.         // has elapsed.
  80.         // @param timeout
  81.         //    timeout in milliseconds.
  82.         // @param
  83.         //    return TRUE if timeout occured, FALSE otherwise.
  84.         bool                wait(unsigned long timeout);   
  85.         // Wakes up a single thread that is waiting on this
  86.         // object's monitor.
  87.         void                notify();
  88.         // Wakes up all threads that are waiting on this object's
  89.         // monitor.
  90.         void                notify_all();
  91.         // Enter a critical section.
  92.         void                lock();
  93.         // Try to enter a critical section.
  94.         // @return TRUE if the attempt was successful, FALSE otherwise.
  95.         bool                trylock();
  96.         // Leave a critical section.
  97.         void                unlock();

  98. private:
  99. #ifdef WIN32
  100.         char            numNotifies;
  101.         HANDLE          semEvent;
  102.         HANDLE          semMutex;
  103.         bool                        isLocked;
  104. #else
  105.         int        cond_timed_wait(const timespec*);
  106.         pthread_cond_t  cond;
  107.         pthread_mutex_t monitor;
  108. #endif
  109. };

  110. Synchronized::Synchronized()
  111. {
  112. #ifdef WIN32
  113.         // Semaphore initially auto signaled, auto reset mode, unnamed
  114.         semEvent = CreateEvent(0, false, false, 0);
  115.         // Semaphore initially unowned, unnamed
  116.         semMutex = CreateMutex(0, false, 0);
  117.         isLocked = false;
  118. #else
  119.         int result;

  120.         memset(&monitor, 0, sizeof(monitor));
  121.         result = pthread_mutex_init(&monitor, 0);
  122.         if(result)
  123.         {
  124.                 throw runtime_error("Synchronized mutex_init failed!");
  125.         }

  126.         memset(&cond, 0, sizeof(cond));
  127.         result = pthread_cond_init(&cond, 0);
  128.         if(result)
  129.         {
  130.                 throw runtime_error("Synchronized cond_init failed!");
  131.         }
  132. #endif
  133. }

  134. Synchronized::~Synchronized()
  135. {
  136. #ifdef WIN32
  137.         CloseHandle(semEvent);
  138.         CloseHandle(semMutex);
  139. #else
  140.         int result;
  141.         result = pthread_cond_destroy(&cond);
  142. //        if(result)
  143. //        {
  144. //                throw runtime_error("Synchronized cond_destroy failed!");
  145. //        }
  146.         result = pthread_mutex_destroy(&monitor);
  147. //        if(result)
  148. //        {
  149. //                throw runtime_error("Synchronized mutex_destroy failed!");
  150. //        }
  151. #endif
  152. }

  153. void Synchronized::wait()
  154. {
  155. #ifdef WIN32
  156.         wait(INFINITE);
  157. #else
  158.         cond_timed_wait(0);
  159. #endif
  160. }

  161. #ifndef WIN32
  162. int Synchronized::cond_timed_wait(const struct timespec *ts)
  163. {
  164.   int result;
  165.   if(ts)
  166.         result = pthread_cond_timedwait(&cond, &monitor, ts);
  167.   else
  168.         result = pthread_cond_wait(&cond, &monitor);
  169.   return result;
  170. }
  171. #endif

  172. bool Synchronized::wait(unsigned long timeout)
  173. {
  174.         bool timeoutOccurred = false;
  175. #ifdef WIN32
  176.         isLocked = false;
  177.         if(!ReleaseMutex(semMutex))
  178.         {
  179.                 throw runtime_error("Synchronized: releasing mutex failed");
  180.         }
  181.         int err;
  182.         err = WaitForSingleObject(semEvent, timeout);
  183.         switch(err)
  184.         {
  185.         case WAIT_TIMEOUT:
  186.                 throw runtime_error("Synchronized: timeout on wait");
  187.                 timeoutOccurred = true;
  188.                 break;
  189.         case WAIT_ABANDONED:
  190.                 throw runtime_error("Synchronized: waiting for event failed");
  191.                 break;
  192.         }
  193.         if(WaitForSingleObject (semMutex, INFINITE) != WAIT_OBJECT_0)
  194.         {
  195.                 throw runtime_error("Synchronized: waiting for mutex failed");
  196.         }
  197.         isLocked = true;
  198. #else
  199.         struct timespec ts;
  200.         struct timeval  tv;
  201.         gettimeofday(&tv, 0);
  202.         ts.tv_sec  = tv.tv_sec  + (int)timeout/1000;
  203.         ts.tv_nsec = (tv.tv_usec + (timeout %1000)*1000) * 1000;

  204.         int err;
  205.         if((err = cond_timed_wait(&ts)) > 0)
  206.         {
  207.                 switch(err)
  208.                 {
  209.                 case ETIMEDOUT:
  210.                   timeoutOccurred = true;
  211.                   break;
  212.                 default:
  213.                   throw runtime_error("Synchronized: wait with timeout returned!");
  214.                   break;
  215.                 }
  216.         }
  217. #endif
  218.         return timeoutOccurred;
  219. }

  220. void Synchronized::notify()
  221. {
  222. #ifdef WIN32
  223.         numNotifies = 1;
  224.         if(!SetEvent(semEvent))
  225.         {
  226.                 throw runtime_error("Synchronized: notify failed!");
  227.         }
  228. #else
  229.         int result;
  230.         result = pthread_cond_signal(&cond);
  231.         if(result)
  232.         {
  233.                 throw runtime_error("Synchronized: notify failed!");
  234.         }
  235. #endif
  236. }

  237. void Synchronized::notify_all()
  238. {
  239. #ifdef WIN32
  240.         numNotifies = (char)0x80;
  241.         while (numNotifies--)
  242.         {
  243.                 if(!SetEvent(semEvent))
  244.                 {
  245.                         throw runtime_error("Synchronized: notify_all failed!");
  246.                 }
  247.         }
  248. #else
  249.         int result;
  250.         result = pthread_cond_broadcast(&cond);
  251.         if(result)
  252.         {
  253.                 throw runtime_error("Synchronized: notify_all failed!");
  254.         }
  255. #endif
  256. }

  257. void Synchronized::lock()
  258. {
  259. #ifdef WIN32
  260.     if(WaitForSingleObject(semMutex, INFINITE) != WAIT_OBJECT_0)
  261.     {
  262.                 throw runtime_error("Synchronized: lock failed!");
  263.                 return;
  264.     }
  265.     if(isLocked)
  266.     {
  267.                 // This thread owns already the lock, but
  268.             // we do not like recursive locking. Thus
  269.             // release it immediately and print a warning!
  270.                 if(!ReleaseMutex(semMutex))
  271.                 {
  272.                         throw runtime_error("Synchronized: unlock failed!");
  273.                 }      
  274.                 throw runtime_error("Synchronized: recursive locking detected!");
  275.     }
  276.     isLocked = true;
  277. #else
  278.     pthread_mutex_lock(&monitor);
  279. #endif
  280. }

  281. void Synchronized::unlock()
  282. {
  283. #ifdef WIN32
  284.         isLocked = fasle;
  285.         if(!ReleaseMutex(semMutex))
  286.         {
  287.                 throw runtime_error("Synchronized: unlock failed!");
  288.                 return;
  289.         }
  290. #else
  291.         pthread_mutex_unlock(&monitor);
  292. #endif
  293. }

  294. bool Synchronized::trylock()
  295. {
  296. #ifdef WIN32
  297.         int status = WaitForSingleObject(semMutex, 0);
  298.         if(status != WAIT_OBJECT_0)
  299.         {
  300.                 throw runtime_error("Synchronized: try lock failed!");
  301.                 return false;
  302.         }
  303.         if(isLocked)
  304.         {
  305.                 if(!ReleaseMutex(semMutex))
  306.                 {
  307.                         throw runtime_error("Synchronized: unlock failed!");
  308.                 }
  309.                 return false;
  310.         }
  311.         else
  312.         {
  313.                 isLocked = true;
  314.         }
  315.         return true;
  316. #else
  317.         if(pthread_mutex_trylock(&monitor) == 0)
  318.                 return true;
  319.         else
  320.                 return false;
  321. #endif
  322. }


复制代码

[ 本帖最后由 lipeng_927 于 2006-11-29 11:17 编辑 ]

论坛徽章:
0
2 [报告]
发表于 2006-11-29 12:48 |只看该作者

怎么没人理啊

论坛徽章:
0
3 [报告]
发表于 2006-11-29 12:52 |只看该作者
这种东西不难吧?就是线程同步

论坛徽章:
38
2017金鸡报晓
日期:2017-02-08 10:39:4215-16赛季CBA联赛之深圳
日期:2023-02-16 14:39:0220周年集字徽章-年
日期:2022-08-31 14:25:28黑曼巴
日期:2022-08-17 18:57:0919周年集字徽章-年
日期:2022-04-25 13:02:5920周年集字徽章-20	
日期:2022-03-29 11:10:4620周年集字徽章-年
日期:2022-03-14 22:35:1820周年集字徽章-周	
日期:2022-03-09 12:51:3220周年集字徽章-年
日期:2022-02-10 13:13:4420周年集字徽章-周	
日期:2022-02-03 12:09:4420周年集字徽章-20	
日期:2022-01-25 20:14:2720周年集字徽章-周	
日期:2022-01-13 15:12:33
4 [报告]
发表于 2006-11-29 12:58 |只看该作者
用std::queue加个锁或许性能更好些。

论坛徽章:
0
5 [报告]
发表于 2006-11-29 13:05 |只看该作者
我用的就是queue。。。。。

论坛徽章:
0
6 [报告]
发表于 2006-11-29 13:06 |只看该作者
我还是跑吧,找别人商量去。。。。哈哈
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP