- 论坛徽章:
- 0
|
我自己写的一个队列,放在c++版里没人看,你看看能不能用的上
- #ifdef WIN32
- #include <winbase.h>
- #else
- #include <pthread.h>
- #include <unistd.h>
- #endif
- #include <errno.h>
- #include <time.h>
- #include <sys/types.h>
- #include <queue>
- using namespace std;
- //这个是线程安全的队列,Synchronized类的实现在下面
- template <class Type>
- class SyncQueue:public Synchronized{
- public:
- SyncQueue();
- ~SyncQueue();
- void push(const Type &);
- Type pop();
- private:
- queue<Type> *p_Queue;
- };
- template <class Type>
- SyncQueue<Type>::SyncQueue(){
- p_Queue = new queue<Type>() ;
- }
- template <class Type>
- SyncQueue<Type>::~SyncQueue(){
- delete p_Queue;
- }
- template <class Type>
- Type SyncQueue<Type>::pop(){
- Type type;
- lock();
- while(p_Queue->empty()){
- try
- {
- wait();
- }
- catch(exception& ex)
- {
- cout<< ex.what()<<endl;
- throw;
- }
- }
- type=p_Queue->front();
- p_Queue->pop();
- unlock();
- return type;
- }
- template <class Type>
- void SyncQueue<Type>::push(const Type &type){
- lock();
- p_Queue->push(type);
- try
- {
- notify();
- }
- catch(exception& ex)
- {
- cout<<ex.what()<<endl;
- throw;
- }
- unlock();
- }
- //Synchronized类的实现,仿java的一些方法,我参考了别人的代码
- class Synchronized
- {
- public:
- Synchronized();
- ~Synchronized();
- // Causes current thread to wait until another thread
- // invokes the notify() method or the notifyAll()
- // method for this object.
- void wait();
- // Causes current thread to wait until either another
- // thread invokes the notify() method or the notifyAll()
- // method for this object, or a specified amount of time
- // has elapsed.
- // @param timeout
- // timeout in milliseconds.
- // @param
- // return TRUE if timeout occured, FALSE otherwise.
- bool wait(unsigned long timeout);
- // Wakes up a single thread that is waiting on this
- // object's monitor.
- void notify();
- // Wakes up all threads that are waiting on this object's
- // monitor.
- void notify_all();
- // Enter a critical section.
- void lock();
- // Try to enter a critical section.
- // @return TRUE if the attempt was successful, FALSE otherwise.
- bool trylock();
- // Leave a critical section.
- void unlock();
- private:
- #ifdef WIN32
- char numNotifies;
- HANDLE semEvent;
- HANDLE semMutex;
- bool isLocked;
- #else
- int cond_timed_wait(const timespec*);
- pthread_cond_t cond;
- pthread_mutex_t monitor;
- #endif
- };
- Synchronized::Synchronized()
- {
- #ifdef WIN32
- // Semaphore initially auto signaled, auto reset mode, unnamed
- semEvent = CreateEvent(0, false, false, 0);
- // Semaphore initially unowned, unnamed
- semMutex = CreateMutex(0, false, 0);
- isLocked = false;
- #else
- int result;
- memset(&monitor, 0, sizeof(monitor));
- result = pthread_mutex_init(&monitor, 0);
- if(result)
- {
- throw runtime_error("Synchronized mutex_init failed!");
- }
- memset(&cond, 0, sizeof(cond));
- result = pthread_cond_init(&cond, 0);
- if(result)
- {
- throw runtime_error("Synchronized cond_init failed!");
- }
- #endif
- }
- Synchronized::~Synchronized()
- {
- #ifdef WIN32
- CloseHandle(semEvent);
- CloseHandle(semMutex);
- #else
- int result;
- result = pthread_cond_destroy(&cond);
- // if(result)
- // {
- // throw runtime_error("Synchronized cond_destroy failed!");
- // }
- result = pthread_mutex_destroy(&monitor);
- // if(result)
- // {
- // throw runtime_error("Synchronized mutex_destroy failed!");
- // }
- #endif
- }
- void Synchronized::wait()
- {
- #ifdef WIN32
- wait(INFINITE);
- #else
- cond_timed_wait(0);
- #endif
- }
- #ifndef WIN32
- int Synchronized::cond_timed_wait(const struct timespec *ts)
- {
- int result;
- if(ts)
- result = pthread_cond_timedwait(&cond, &monitor, ts);
- else
- result = pthread_cond_wait(&cond, &monitor);
- return result;
- }
- #endif
- bool Synchronized::wait(unsigned long timeout)
- {
- bool timeoutOccurred = false;
- #ifdef WIN32
- isLocked = false;
- if(!ReleaseMutex(semMutex))
- {
- throw runtime_error("Synchronized: releasing mutex failed");
- }
- int err;
- err = WaitForSingleObject(semEvent, timeout);
- switch(err)
- {
- case WAIT_TIMEOUT:
- throw runtime_error("Synchronized: timeout on wait");
- timeoutOccurred = true;
- break;
- case WAIT_ABANDONED:
- throw runtime_error("Synchronized: waiting for event failed");
- break;
- }
- if(WaitForSingleObject (semMutex, INFINITE) != WAIT_OBJECT_0)
- {
- throw runtime_error("Synchronized: waiting for mutex failed");
- }
- isLocked = true;
- #else
- struct timespec ts;
- struct timeval tv;
- gettimeofday(&tv, 0);
- ts.tv_sec = tv.tv_sec + (int)timeout/1000;
- ts.tv_nsec = (tv.tv_usec + (timeout %1000)*1000) * 1000;
- int err;
- if((err = cond_timed_wait(&ts)) > 0)
- {
- switch(err)
- {
- case ETIMEDOUT:
- timeoutOccurred = true;
- break;
- default:
- throw runtime_error("Synchronized: wait with timeout returned!");
- break;
- }
- }
- #endif
- return timeoutOccurred;
- }
- void Synchronized::notify()
- {
- #ifdef WIN32
- numNotifies = 1;
- if(!SetEvent(semEvent))
- {
- throw runtime_error("Synchronized: notify failed!");
- }
- #else
- int result;
- result = pthread_cond_signal(&cond);
- if(result)
- {
- throw runtime_error("Synchronized: notify failed!");
- }
- #endif
- }
- void Synchronized::notify_all()
- {
- #ifdef WIN32
- numNotifies = (char)0x80;
- while (numNotifies--)
- {
- if(!SetEvent(semEvent))
- {
- throw runtime_error("Synchronized: notify_all failed!");
- }
- }
- #else
- int result;
- result = pthread_cond_broadcast(&cond);
- if(result)
- {
- throw runtime_error("Synchronized: notify_all failed!");
- }
- #endif
- }
- void Synchronized::lock()
- {
- #ifdef WIN32
- if(WaitForSingleObject(semMutex, INFINITE) != WAIT_OBJECT_0)
- {
- throw runtime_error("Synchronized: lock failed!");
- return;
- }
- if(isLocked)
- {
- // This thread owns already the lock, but
- // we do not like recursive locking. Thus
- // release it immediately and print a warning!
- if(!ReleaseMutex(semMutex))
- {
- throw runtime_error("Synchronized: unlock failed!");
- }
- throw runtime_error("Synchronized: recursive locking detected!");
- }
- isLocked = true;
- #else
- pthread_mutex_lock(&monitor);
- #endif
- }
- void Synchronized::unlock()
- {
- #ifdef WIN32
- isLocked = fasle;
- if(!ReleaseMutex(semMutex))
- {
- throw runtime_error("Synchronized: unlock failed!");
- return;
- }
- #else
- pthread_mutex_unlock(&monitor);
- #endif
- }
- bool Synchronized::trylock()
- {
- #ifdef WIN32
- int status = WaitForSingleObject(semMutex, 0);
- if(status != WAIT_OBJECT_0)
- {
- throw runtime_error("Synchronized: try lock failed!");
- return false;
- }
- if(isLocked)
- {
- if(!ReleaseMutex(semMutex))
- {
- throw runtime_error("Synchronized: unlock failed!");
- }
- return false;
- }
- else
- {
- isLocked = true;
- }
- return true;
- #else
- if(pthread_mutex_trylock(&monitor) == 0)
- return true;
- else
- return false;
- #endif
- }
复制代码 |
|