- 论坛徽章:
- 0
|
- /*******************************************************
- * HdrFileName: thrdpool.h
- * Description: Implement a thread pool.
- * CreateDate : 2005.7.16
- * Author : peijianzhong
- * Email : pjz0311@emails.bjut.edu.cn
- * Version : 1.0
- * Interface :
- * thrdpool_t *createThrdPool( int maxthrds, int maxclts, char *plogfile );
- * int addClt( thrdpool_t *pthrdpool, int cltid, thrdcall_t thrdcall, thrdcall_t thrdexit );
- * void destroyThrdPool( thrdpool_t *pthrdpool );
- *******************************************************/
- #ifndef _THRDPOOL_H
- #define _THRDPOOL_H
- #include <pthread.h>;
- #include <stdlib.h>;
- #include "runlog.h"
- #define THRDPOOL_OK 0
- #define ETOOMANYCLTS -1
- typedef struct {
- pthread_t *pthrds;
- int count;
- } thrdtab_t;
- typedef enum { IDLE, WAITING, PROCESSING } cltstate_t;
- typedef void *(*thrdfunc_t)(void *);
- /* */
- typedef struct {
- thrdfunc_t thrdfunc;
- void *arg;
- } thrdcall_t;
- typedef struct {
- int cltid;
- cltstate_t state; /* the current state of this client */
- thrdcall_t thrdcall; /* the function excuted by thread defined by user */
- thrdcall_t thrdexit; /* the function excuted by thread when process finishes */
- long pri; /* priority */
- } clt_t;
- typedef struct {
- clt_t *pclts;
- int count; /* the max count of clients permited */
- int waitcount; /* the count of clients in WAITING state */
- int proscount; /* the count of clients in PROCESSING state */
- long prino; /* increasing number, used to set the coming client's priority */
- } clttab_t;
- typedef struct {
- thrdtab_t thrdtab;
- clttab_t clttab; /* */
- pthread_mutex_t thrdmutex;/* */
- pthread_cond_t thrdcond; /* */
- char logfile[256]; /* run log file name */
- } thrdpool_t;
- extern thrdpool_t *createThrdPool( int maxthrds, int maxclts, char *plogfile );
- extern int addClt( thrdpool_t *pthrdpool, int cltid, thrdcall_t thrdcall, thrdcall_t thrdexit );
- extern void destroyThrdPool( thrdpool_t *pthrdpool );
- #endif
- ------------------------------------------------------------------------------
- /*******************************************************
- * SrcFileName: thrdpool.c
- * Description: implement a thread pool.
- * CreateDate : 2005.7.16
- * Author : peijianzhong
- * Email : pjz0311@emails.bjut.edu.cn
- * Version : 1.0
- * Interface :
- * thrdpool_t *createThrdPool( int maxthrds, int maxclts, char *plogfile );
- * int addClt( thrdpool_t *pthrdpool, int cltid, thrdcall_t thrdcall, thrdcall_t thrdexit );
- * void destroyThrdPool( thrdpool_t *pthrdpool );
- *******************************************************/
- #include "thrdpool.h"
- /******************************************************
- * FuncName : thrdproc
- * Description :
- * provide a framework to do the client side real actions
- * it's blocked when no client arive.
- *
- * Input : void *arg, actually it's a pointer to struct thrdpool_t
- * Output : no
- * Return : viod
- *****************************************************/
- static void *thrdproc( void *arg ) {
- thrdpool_t *pthrdpool = ( thrdpool_t *)arg;
- clt_t *clts = pthrdpool->;clttab.pclts;
- pthread_detach( pthread_self() );
- pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL );
- for(;;) {
- int retlock;
- retlock = pthread_mutex_lock( &pthrdpool->;thrdmutex );
- while( pthrdpool->;clttab.waitcount == 0 )
- pthread_cond_wait( &pthrdpool->;thrdcond, &pthrdpool->;thrdmutex );
- /* find one client which has the highest priority */
- int idx, cltidx = -1;
- long hpri = 99999;
- for( idx = 0; idx < pthrdpool->;clttab.count; idx++ ) {
- if( clts[idx].state == WAITING ) {
- if( hpri >; clts[idx].pri ) {
- hpri = clts[idx].pri;
- cltidx = idx;
- }
- }
- }
- runLog(pthrdpool->;logfile,"found wait client, clientidx=%d, hpri=%ld, cltid=%d, state=%d, pri=%ld, current thread is %ld \n", cltidx,hpri, clts[cltidx].cltid, clts[cltidx].state, clts[cltidx].pri, pthread_self());
- clts[cltidx].state = PROCESSING;
- pthrdpool->;clttab.waitcount--;
- pthrdpool->;clttab.proscount++;
- //runLog(pthrdpool->;logfile,"clientidx=%d, cltid=%d, state=%d, pri=%ld, waitcount=%d \n", cltidx, clts[cltidx].cltid, clts[cltidx].state, clts[cltidx].pri, pthrdpool->;clttab.waitcount);
- pthread_mutex_unlock( &pthrdpool->;thrdmutex );
-
- runLog( pthrdpool->;logfile, "start processing client [%d], current thread is %ld.\n", clts[cltidx].cltid, pthread_self() );
- /* processing area */
- ( *clts[cltidx].thrdcall.thrdfunc)( clts[cltidx].thrdcall.arg );
- runLog( pthrdpool->;logfile, "processing client [%d] end, current thread is %ld.\n", clts[cltidx].cltid, pthread_self());
- if( clts[cltidx].thrdexit.thrdfunc != NULL )
- ( *clts[cltidx].thrdexit.thrdfunc)( clts[cltidx].thrdexit.arg );
- /* free this client resource */
- pthread_mutex_lock( &pthrdpool->;thrdmutex );
- clts[cltidx].state = IDLE;
- pthrdpool->;clttab.proscount--;
- clts[cltidx].thrdcall.thrdfunc = NULL;
- clts[cltidx].thrdcall.arg = NULL;
- clts[cltidx].thrdexit.thrdfunc = NULL;
- clts[cltidx].thrdcall.arg = NULL;
- runLog( pthrdpool->;logfile, " the resource released,cltidx=%d, cltid=%d , pri=%ld, curthread is %ld \n", cltidx, clts[cltidx].cltid, clts[cltidx].pri, pthread_self() );
- pthread_mutex_unlock( &pthrdpool->;thrdmutex );
- }
-
- return;
- }
- /******************************************************
- * FuncName : createThrdPool
- * Description : To create a thread pool
- * Input :
- * @maxthrds : max thread count permit to create
- * @maxclts : max clients count permit to serve
- * @plogfile : log file name
- * Output : no
- * Return : the pointer of thrdpool_t instance created here.
- *****************************************************/
- thrdpool_t *createThrdPool( int maxthrds, int maxclts, char *plogfile ) {
- int idx;
- thrdpool_t *pthrdpool;
- clt_t *clts;
- /*
- * create a thread pool and initialize it
- * according to the parameters specified by the caller
- */
- if(( maxthrds <= 0 ) || ( maxclts <= 0 )) {
- runLog( plogfile, "smaller than zero is illegal for the paramter maxthrds or maxclts of function createThrdPool.\n" );
- return NULL;
- }
- pthrdpool = calloc( sizeof( thrdpool_t ), 1);
- pthrdpool->;thrdtab.pthrds = calloc( sizeof( pthread_t ), maxthrds );
- pthrdpool->;thrdtab.count = maxthrds;
- pthrdpool->;clttab.pclts = calloc( sizeof( clt_t ), maxclts );
- clts = pthrdpool->;clttab.pclts;
- pthrdpool->;clttab.count = maxclts;
- if( plogfile == NULL )
- pthrdpool->;logfile[0] = '\0';
- else
- strcpy( pthrdpool->;logfile, plogfile );
- pthrdpool->;clttab.waitcount = 0; /* waiting count */
- pthrdpool->;clttab.proscount = 0; /* processing count */
- pthrdpool->;clttab.prino = 0; /* increasing pri number */
- for( idx = 0; idx < maxclts; idx++ ) {
- clts[idx].state = IDLE;
- clts[idx].pri = 0;
- clts[idx].cltid = -1;
- clts[idx].thrdcall.thrdfunc = NULL;
- clts[idx].thrdcall.arg = NULL;
- clts[idx].thrdexit.thrdfunc = NULL;
- clts[idx].thrdcall.arg = NULL;
- }
- pthread_mutex_init( &pthrdpool->;thrdmutex, NULL );
- pthread_cond_init( &pthrdpool->;thrdcond, NULL );
-
- for( idx = 0; idx < maxthrds; idx++ ) {
- pthread_create( &pthrdpool->;thrdtab.pthrds[idx], NULL, thrdproc, (void *)pthrdpool );
- }
- runLog( plogfile, "new threadpool created, max threads count is %d, max clients count is %d.\n", maxthrds, maxclts );
- return pthrdpool;
- }
- /******************************************************
- * FuncName : destroyThrdPool
- * Description : free the resources the thread pool used
- * Input : thrdpool_t *pthrdpool
- * Output : no
- * Return : void
- *****************************************************/
-
- void destroyThrdPool( thrdpool_t *pthrdpool ) {
- int idx;
- for( idx = 0; idx < pthrdpool->;thrdtab.count; idx++) {
- pthread_cancel( pthrdpool->;thrdtab.pthrds[idx] );
- }
- pthread_mutex_unlock( &pthrdpool->;thrdmutex ); /* maybe return error EPERM here, but it doesn't matter. */
- free( pthrdpool->;thrdtab.pthrds );
- free( pthrdpool->;clttab.pclts );
- free( pthrdpool );
- pthrdpool = NULL;
-
- }
- /******************************************************
- * FuncName : addClt
- * Description : add a new client to the pool specified by the param pthrdpool
- * Input :
- * @pthrdpool : the thread pool to add the client to
- * @cltid : the identifier used by user to identify the client,
- * for example,it can be the connection descriptor returned by accept(..) call.
- * @thrdcall : a struct include the real client function and it's arguments to perform by the thread
- * @thrdexit : a struct include the real client function and it's arguments to perform when thrdcall finished.
- * Output : no
- * Return :
- * ETOOMANYCLTS : no place for the new client
- * THRDPOOL_OK : the client is successfully added into the thread pool
- *****************************************************/
-
- int addClt( thrdpool_t *pthrdpool, int cltid, thrdcall_t thrdcall, thrdcall_t thrdexit ) {
- int idx, idleidx = -1;
- clt_t *clts;
- pthread_mutex_lock( &pthrdpool->;thrdmutex );
- /* get first idle client item */
- for( idx = 0; idx < pthrdpool->;clttab.count; idx++ ) {
- if( pthrdpool->;clttab.pclts[idx].state == IDLE ) {
- idleidx = idx;
- break;
- }
- }
- if( idleidx == -1 ) {
- runLog( pthrdpool->;logfile, "too many clients for new client [%d].\n", cltid );
- pthread_mutex_unlock( &pthrdpool->;thrdmutex );
- return ETOOMANYCLTS;
- }
- clts = pthrdpool->;clttab.pclts;
- clts[idleidx].cltid = cltid;
- clts[idleidx].state = WAITING;
- clts[idleidx].thrdcall.thrdfunc = thrdcall.thrdfunc;
- clts[idleidx].thrdcall.arg = thrdcall.arg;
- clts[idleidx].thrdexit.thrdfunc = thrdexit.thrdfunc;
- clts[idleidx].thrdexit.arg = thrdexit.arg;
- pthrdpool->;clttab.waitcount++;
- /* calculate the priority of the coming client */
- pthrdpool->;clttab.prino++;
- clts[idleidx].pri = pthrdpool->;clttab.prino;
- runLog( pthrdpool->;logfile, "new client [%d] added,cltidx=%d, state=%d, pri=%ld now waiting for service.\n", cltid,idleidx,clts[idleidx].state,clts[idleidx].pri );
- if( pthrdpool->;clttab.waitcount == 1 )
- pthread_cond_signal( &pthrdpool->;thrdcond );
- pthread_mutex_unlock( &pthrdpool->;thrdmutex );
- return THRDPOOL_OK;
- }
- ----------------------------------------------------------
- int main() {
- int listenfd, connfd, saidx = 0, *pconnfd;
- rspns_t rspns;
- char msg[256];
- thrdpool_t *pthrdpool;
- thrdcall_t thrdcall, thrdexit = { NULL, NULL };
- initNsTab();
- if(( pthrdpool = createThrdPool( MAXTHREADS, MAXCLIENTS, NS_RUNLOGFILE )) == NULL )
- return -1;
-
- bzero( &rspns, sizeof( rspns_t ) );
- if(( saidx = findSaRcd( NAMESERVID )) == ERR ) {
- sprintf( msg, "Naming Server Conf Info [%s] not found in [%s].\n", NAMESERVID, SA_DOMAINCONF );
- errLog( NS_RUNLOGFILE, msg );
- exit(ERR);
- }
- listenfd = tcpListen( atoi( satab.psa[saidx].sa_port ), LISTENQ );
- runLog( NS_RUNLOGFILE, "Naming Server is listening in port [%s] now. It's listenfd is %d.\n", satab.psa[saidx].sa_port, listenfd );
- for(;;) {
- if(( connfd = tcpAccept( listenfd ) ) < 0 ) {
- errLog( NS_RUNLOGFILE, "accept error.\n" );
- exit(1);
- }
- pconnfd = (int *)malloc( sizeof( int ) );
- *pconnfd = connfd;
- thrdcall.thrdfunc = doResponse;
- thrdcall.arg = pconnfd;
- addClt( pthrdpool, connfd, thrdcall, thrdexit );
- }
-
- return OK;
- }
复制代码 [/code] |
|