- 论坛徽章:
- 0
|
作者:
呆若木鸡
本例示范Linux信号量的基本用法。该范例使用了两个线程分别对一个公用队列进行入队和出队操作,并用信号量进行控制,当队列空时出队操作可以被阻塞,当队列满时入队操作可以被阻塞。
主要用到的信号量函数有:
sem_init:初始化信号量sem_t,初始化的时候可以指定信号量的初始值,以及是否可以在多进程间共享。
sem_wait:一直阻塞等待直到信号量>0。
sem_timedwait:阻塞等待若干时间直到信号量>0。
sem_post:使信号量加1。
sem_destroy:释放信号量。和sem_init对应。
关于各函数的具体参数请用man查看。如man sem_init可查看该函数的帮助。
下面看具体的代码:
![]()
//--------------------------msgdequeue.h开始-------------------------------------
![]()
//实现可控队列
![]()
#ifndef MSGDEQUEUE_H
![]()
#define MSGDEQUEUE_H
![]()
#include "tmutex.h"
![]()
#include iostream>
![]()
#include errno.h>
![]()
#include time.h>
![]()
#include semaphore.h>
![]()
#include deque>
![]()
using namespace std;
![]()
![]()
templatetypename T,typename MUTEX_TYPE = ThreadMutex>
![]()
class CMessageDequeue
![]()
![]()
{
![]()
public:
![]()
CMessageDequeue(size_t MaxSize) : m_MaxSize( MaxSize )
![]()
![]()
...{
![]()
sem_init( &m_enques,0, m_MaxSize ); //入队信号量初始化为MaxSize,最多可容纳MaxSize各元素
![]()
sem_init( &m_deques,0,0 ); //队列刚开始为空,出队信号量初始为0
![]()
}
![]()
![]()
~CMessageDequeue()
![]()
![]()
...{
![]()
sem_destroy(&m_enques);
![]()
sem_destroy(&m_deques);
![]()
}
![]()
![]()
int sem_wait_i( sem_t *psem, int mswait )
![]()
![]()
...{//等待信号量变成>0,mswait为等待时间,若mswait
![]()
if( mswait 0 )
![]()
![]()
...{
![]()
int rv = 0;
![]()
while( ((rv = sem_wait(psem) ) != 0 ) && (errno == EINTR
![]()
) ); //等待信号量,errno==EINTR屏蔽其他信号事件引起的等待中断
![]()
return rv;
![]()
}
![]()
else
![]()
![]()
...{
![]()
timespec ts;
![]()
clock_gettime(CLOCK_REALTIME, &ts ); //获取当前时间
![]()
ts.tv_sec += (mswait / 1000 ); //加上等待时间的秒数
![]()
ts.tv_nsec += ( mswait % 1000 ) * 1000; //加上等待时间纳秒数
![]()
int rv = 0;
![]()
while( ((rv=sem_timedwait( psem, &ts ))!=0) && (errno ==
![]()
EINTR) ); //等待信号量,errno==EINTR屏蔽其他信号事件引起的等待中断
![]()
return rv;
![]()
}
![]()
![]()
}
![]()
bool push_back( const T &item, int mswait = -1 )
![]()
![]()
...{ //等待mswait毫秒直到将item插入队列,mswait为-1则一直等待
![]()
if( -1 == sem_wait_i( &m_enques, mswait ))
![]()
![]()
...{
![]()
return false;
![]()
}
![]()
![]()
//AUTO_GUARD:定界加锁,见Linux多线程及临界区编程例解的tmutex.h文件定义。
![]()
AUTO_GUARD( g, MUTEX_TYPE, m_lock );
![]()
try
![]()
![]()
...{
![]()
m_data.push_back( item );
![]()
cout "push " item endl;
![]()
sem_post( &m_deques );
![]()
return true;
![]()
}
![]()
catch(...)
![]()
![]()
...{
![]()
return false;
![]()
}
![]()
}
![]()
![]()
bool pop_front( T &item, bool bpop = true, int mswait = -1 )
![]()
![]()
...{ //等待mswait毫秒直到从队列取出元素,mswait为-1则一直等待
![]()
if( -1 == sem_wait_i( &m_deques, mswait ) )
![]()
![]()
...{
![]()
return false;
![]()
}
![]()
//AUTO_GUARD:定界加锁,见Linux多线程及临界区编程例解的tmutex.h文件定义。
![]()
AUTO_GUARD( g, MUTEX_TYPE, m_lock );
![]()
try
![]()
![]()
...{
![]()
item = m_data.front();
![]()
if( bpop )
![]()
![]()
...{
![]()
m_data.pop_front();
![]()
cout "pop " item endl;
![]()
}
![]()
![]()
sem_post( &m_enques );
![]()
return true;
![]()
}
![]()
catch(...)
![]()
![]()
...{
![]()
return false;
![]()
}
![]()
}
![]()
inline size_t size()
![]()
![]()
...{
![]()
return m_data.size();
![]()
}
![]()
![]()
private:
![]()
MUTEX_TYPE m_lock;
![]()
dequeT> m_data;
![]()
size_t m_MaxSize;
![]()
sem_t m_enques;
![]()
sem_t m_deques;
![]()
};
![]()
![]()
#endif
![]()
![]()
//--------------------------msgdequeue.h结束-------------------------------------
![]()
![]()
//--------------------------test.cpp开始-------------------------------------
![]()
//主程序文件
![]()
![]()
#include "msgdequeue.h"
![]()
#include pthread.h>
![]()
#include iostream>
![]()
using namespace std;
![]()
![]()
CMessageDequeueint> qq(5);
![]()
![]()
void *get_thread(void *parg);
![]()
void *put_thread(void *parg);
![]()
![]()
void *get_thread(void *parg)
![]()
![]()
{
![]()
while(true)
![]()
![]()
...{
![]()
int a = -1;
![]()
if( !qq.pop_front( a,true, 1000 ) )
![]()
![]()
...{
![]()
cout "pop failed. size=" qq.size() endl;
![]()
}
![]()
}
![]()
return NULL;
![]()
}
![]()
![]()
void *put_thread(void *parg)
![]()
![]()
{
![]()
for(int i=1; i30; i++)
![]()
![]()
...{
![]()
qq.push_back( i, -1 );
![]()
}
![]()
![]()
return NULL;
![]()
}
![]()
![]()
int main()
![]()
![]()
{
![]()
pthread_t pget,pput;
![]()
pthread_create( &pget,NULL,get_thread,NULL);
![]()
pthread_create( &pput, NULL, put_thread,NULL);
![]()
![]()
pthread_join( pget,NULL );
![]()
pthread_join( pput,NULL );
![]()
![]()
return 0;
![]()
}
![]()
![]()
//--------------------------test.cpp结束-------------------------------------............
编译程序:g++ msgdequeue.h test.cpp -lpthread -lrt -o test
-lpthread链接pthread库。-ltr链接clock_gettime函数相关库。
编译后生成可执行文件test。输入./test执行程序。
线程get_thread每隔1000毫秒从队列取元素,线程put_thread将30个元素依次入队。两个线程模拟两条入队和出队的流水线。因我们在
CMessageDequeue
qq(5)处定义了队列最多可容纳5个元素,入队线程每入队到队列元素满5个后需阻塞等待出队线程将队列元素出队才能继续。测试时可调整队列可容纳最大元
素个数来观察运行效果。
本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u2/67780/showart_2075587.html |
|