- 论坛徽章:
- 0
|
本帖最后由 pinecrane 于 2011-09-12 17:58 编辑
spserver-0.9.5版本的代码为基准进行分析,发现一处在使用线程池上面的疑似BUG,
现在写出来,大家看看是不是真的是BUG。看红字部分就可以了。
分析的前提是: LFServer或者线程池没有被shutdown, 一直在正常的运行。
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
testunp.cpp
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
根据命令行指定的server类型是HAHS还是LF进行判断,启动指定的server
103: if( 0 == strcasecmp( serverType, "hahs" ) ) {
...
...
111: } else {
112: SP_LFServer server( "", port, new SP_UnpHandlerFactory() );
113:
114: server.setTimeout( 60 );
115: server.setMaxThreads( maxThreads );
116: server.setReqQueueSize( 100, "Sorry, server is busy now!\r\n" );
117:
118: server.runForever();
119: }
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
splfserver.cpp
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
void SP_LFServer :: lfHandler( void * arg )
{
SP_LFServer * server = (SP_LFServer*)arg;
每个创建出来的线程都会永远循环去做 handleOneEvent,不会退出.
(不考虑server被shutdown的情况)
for( ; 0 == server->mIsShutdown; ) {
server->handleOneEvent();
}
}
void SP_LFServer :: handleOneEvent()
{
SP_Task * task = NULL;
SP_Message * msg = NULL;
每个创建出来的线程都去竞争一个锁,得到的成为leader,其余的还是follower。
sp_thread_mutex_lock( &mMutex );
for( ; 0 == mIsShutdown && NULL == task && NULL == msg; ) {
if( mEventArg->getInputResultQueue()->getLength() > 0 ) {
task = (SP_Task*)mEventArg->getInputResultQueue()->pop();
} else if( mEventArg->getOutputResultQueue()->getLength() > 0 ) {
msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();
}
if( NULL == task && NULL == msg ) {
event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE );
}
}
sp_thread_mutex_unlock( &mMutex );
释放这个锁,让其他follower去竞争选出一个leader.
这个前任leader去执行job,由leader转变成worker.当这个job执行完后,成为follower,
和其他的follower去竞争成为新leader.
if( NULL != task ) task->run();
if( NULL != msg ) mCompletionHandler->completionMessage( msg );
}
214: void SP_LFServer :: runForever()
215: {
216: run();
217: pause();
218: }
169: int SP_LFServer :: run()
170: {
...
...
205: mThreadPool = new SP_ThreadPool( mMaxThreads );
206: for( int i = 0; i < mMaxThreads; i++ ) {
207: mThreadPool->dispatch( lfHandler, this );
208: }
...
...
}
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
spthreadpool.cpp
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
int SP_ThreadPool :: dispatch( DispatchFunc_t dispatchFunc, void *arg )
{
int ret = 0;
sp_thread_attr_t attr;
SP_Thread_t * thread = NULL;
sp_thread_mutex_lock( &mMainMutex );
如果所有已经创建的线程都是忙的状态并且线程数达到了最大数,那么不再创建新线程,
一直等到某一个线程执行完JOB后被saveThread作为空闲线程保存在空闲线程列表里。
for( ; mIndex <= 0 && mTotal >= mMaxThreads; ) {
sp_thread_cond_wait( &mIdleCond, &mMainMutex );
}
没有空闲线程就去创建一个
if( mIndex <= 0 ) {
SP_Thread_t * thread = ( SP_Thread_t * )malloc( sizeof( SP_Thread_t ) );
memset( &thread->mId, 0, sizeof( thread->mId ) );
sp_thread_mutex_init( &thread->mMutex, NULL );
sp_thread_cond_init( &thread->mCond, NULL );
thread->mFunc = dispatchFunc;
thread->mArg = arg;
thread->mParent = this;
sp_thread_attr_init( &attr );
sp_thread_attr_setdetachstate( &attr, SP_THREAD_CREATE_DETACHED );
if( 0 == sp_thread_create( &( thread->mId ), &attr, wrapperFunc, thread ) ) {
mTotal++;
sp_syslog( LOG_NOTICE, "[tp@%s] create thread#%ld\n", mTag, thread->mId );
} else {
ret = -1;
sp_syslog( LOG_WARNING, "[tp@%s] cannot create thread\n", mTag );
sp_thread_mutex_destroy( &thread->mMutex );
sp_thread_cond_destroy( &thread->mCond );
free( thread );
}
sp_thread_attr_destroy( &attr );
} else {
mIndex--;
thread = mThreadList[ mIndex ];
mThreadList[ mIndex ] = NULL;
thread->mFunc = dispatchFunc;
thread->mArg = arg;
thread->mParent = this;
sp_thread_mutex_lock( &thread->mMutex );
sp_thread_cond_signal( &thread->mCond ) ;
sp_thread_mutex_unlock ( &thread->mMutex );
}
sp_thread_mutex_unlock( &mMainMutex );
return ret;
}
sp_thread_result_t SP_THREAD_CALL SP_ThreadPool :: wrapperFunc( void * arg )
{
SP_Thread_t * thread = ( SP_Thread_t * )arg;
for( ; 0 == thread->mParent->mIsShutdown; ) {
在这里实际上就是每个线程去永远的循环做 handleOneEvent, 因为在handleOneEvent
那里是 for( ; 0 == server->mIsShutdown; ) 永远循环的,那么handleOneEvent尽管执行完
也不会退出来,继续去做下一次 handleOneEvent。这样的话,thread->mFunc( thread->mArg );
也就没有机会执行完毕退出来,下面的 saveThread就没有机会执行得到,就是说没有其中任何
一个线程会被作为空闲的线程保存到空闲线程列表里面. 那么这样的话,LFServer和线程池
有啥关联??还用得到线程池么?没有任何一个线程有机会被saveThread,mIndex一直 < 0 ,
每个线程都会按照pthread_create的方式创建出来,线程池被LFServer正确的合适的使用了么??
thread->mFunc( thread->mArg );
if( 0 != thread->mParent->mIsShutdown ) break;
sp_thread_mutex_lock( &thread->mMutex );
任何一个创建出来的线程都没有机会执行saveThread(不考虑server或者线程池被关闭的情况)
if( 0 == thread->mParent->saveThread( thread ) ) {
sp_thread_cond_wait( &thread->mCond, &thread->mMutex );
sp_thread_mutex_unlock( &thread->mMutex );
} else {
sp_thread_mutex_unlock( &thread->mMutex );
sp_thread_cond_destroy( &thread->mCond );
sp_thread_mutex_destroy( &thread->mMutex );
free( thread );
thread = NULL;
break;
}
}
if( NULL != thread ) {
sp_thread_mutex_lock( &thread->mParent->mMainMutex );
thread->mParent->mTotal--;
if( thread->mParent->mTotal <= 0 ) {
sp_thread_cond_signal( &thread->mParent->mEmptyCond );
}
sp_thread_mutex_unlock( &thread->mParent->mMainMutex );
}
return 0;
}
按照上面的代码分析,命令行启动LFServe 指定10个线程的时候,这10个线程都是
被pthread_create 创建出来后,就一直在下面
for( ; 0 == server->mIsShutdown; ) {
server->handleOneEvent();
}
的永久循环中不停的执行 handleOneEvent, 不停的发生L/F的角色转换。
但是线程池和LFServer有什么关联?
使用线程池的其中一个目的就是重复利用已经存在的并且空闲的线程,避免创建线程开销。
但是在上面代码分析中,看不到重复利用已经存在的空闲线程的情况。 |
|