免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 4816 | 回复: 8

线程池设计的问题 [复制链接]

论坛徽章:
0
发表于 2010-01-27 21:55 |显示全部楼层
5可用积分
想做个基于线程池的服务端,大体上的思路已经有了,在主线程中进行accept,然后子线程干活。
但在子线程中如何得到“任务”?
如果用信号量来做,是不是得每个线程对应一个信号量来进行判断?
类似于如下结构体:
struct myserv_childthr
{
    int sock;
    sem_t sem;
    pthread_t tid;
};

myserv_childthr childs[5];
.....
.....

然后有类似函数:

int myserv_childthr_init(myserv_childthr *child)
{
    sem_init(&child->sem);
    child->sock = 0;
    pthread_init(&cihld->tid,NULL,work,NULL);
}

应该有更好的设计吧

论坛徽章:
0
发表于 2010-01-27 22:26 |显示全部楼层
看看unp2里的例子

论坛徽章:
0
发表于 2010-01-27 22:47 |显示全部楼层
看过了,没找到里面带的源代码。。没弄明白那几个函数到底是干吗的。。所以才来请教的

论坛徽章:
0
发表于 2010-01-27 22:58 |显示全部楼层
原帖由 shiweifu 于 2010-1-27 22:47 发表
看过了,没找到里面带的源代码。。没弄明白那几个函数到底是干吗的。。所以才来请教的


所有进程/线程模型作者都给了源码。
这个是我以前学习整理过的,可以直接编译使用,模型是主线程统一accept。

/*
*tcp并发服务器程序,主线程统一accept
*/
#include <unistd.h>
#include <syslog.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <sys/resource.h>
#include <fcntl.h>
#include <pthread.h>
#define PORT 6600
#define LISTENQ 1
#define    MAXLINE        4096    //max text line length
#define    MAXN    1024           //max # bytes to request from server
#define HAVE_MSGHDR_MSG_CONTROL
#define max(a,b) ((a) > (b) ? (a) : (b))
typedef struct
{
  pthread_t  thread_tid;  /* thread ID */
  long   thread_count; /* # connections handled */
} Thread;
Thread *tptr;  /* array of Thread structures; calloc'ed */
#define MAXNCLI 32
int     clifd[MAXNCLI], iget, iput;
pthread_mutex_t  clifd_mutex;
pthread_cond_t  clifd_cond;

int    listenfd, nthreads;
socklen_t  addrlen;
struct sockaddr *cliaddr;
pthread_mutex_t pmlock = PTHREAD_MUTEX_INITIALIZER;
static int nchildren;
static pid_t *pids;
pid_t child_make(int, int, int);
void pr_cpu_time(void);
void child_main(int, int, int);
void child_main(int, int, int);
void web_child(int);
long *meter(int); /* for counting #clients/child */
void sig_int(int);
void my_lock_init(char *pathname);
void my_lock_wait();
void my_lock_release();
static pthread_mutex_t *mptr; /* actual mutex will be in shared memory */
ssize_t read_fd(int, void *, size_t, int *);
ssize_t write_fd(int, void *, size_t, int);
void * doit(void *);
void *thread_main(void *);
void thread_make(int i);
int main(int argc, char **argv)
{
int   i, navail, maxfd, nsel, connfd, rc;
pthread_t tid;
socklen_t clilen;
struct sockaddr_in servaddr;
const int on = 1;
ssize_t  n;
fd_set  rset, masterset;

listenfd = socket(AF_INET, SOCK_STREAM, 0);
   
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family      = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port        = htons(PORT);

setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
listen(listenfd, LISTENQ);

    nthreads = 10;
   
tptr = calloc(nthreads, sizeof(Thread));
for (i = 0; i < nthreads; i++)
  thread_make(i);   /* only main thread returns */
signal(SIGINT, sig_int);
   
    for ( ; ; ) {
  clilen = addrlen;
  connfd = accept(listenfd, cliaddr, &clilen);
  pthread_mutex_lock(&clifd_mutex);
  printf("get lock, main\n");
  clifd[iput] = connfd;
  if (++iput == MAXNCLI)
   iput = 0;
  if (iput == iget)
  {
   printf("iput = iget = %d\n", iput);
   exit(1);
  }
  pthread_cond_signal(&clifd_cond);
  pthread_mutex_unlock(&clifd_mutex);
}
}
/* end serv02 */
void thread_make(int i)
{
pthread_create(&tptr.thread_tid, NULL, &thread_main, (void *) i);
return;  /* main thread returns */
}
void *thread_main(void *arg)
{
int  connfd;
printf("thread %d starting\n", (int) arg);
for ( ; ; )
{
     pthread_mutex_lock(&clifd_mutex);
     printf("get lock, thread = [%d]\n", (int) arg);
  while (iget == iput)
   pthread_cond_wait(&clifd_cond, &clifd_mutex);
  connfd = clifd[iget]; //connected socket to service
  if (++iget == MAXNCLI)
   iget = 0;
   //sleep(2);
  pthread_mutex_unlock(&clifd_mutex);
  tptr[(int) arg].thread_count++;
  web_child(connfd);  /* process request */
  close(connfd);
}
}
/* include sigint */
void * doit(void *arg)
{
pthread_detach(pthread_self());
web_child((int) arg);
close((int) arg);
return(NULL);
}
/* end serv06 */
void sig_int(int signo)
{
int  i;
void pr_cpu_time(void);
pr_cpu_time();
for (i = 0; i < nthreads; i++)
  printf("thread %d, %ld connections\n", i, tptr.thread_count);
exit(0);
}
/* end sigint */
void pr_cpu_time(void)
{
double   user, sys;
struct rusage myusage, childusage;
if (getrusage(RUSAGE_SELF, &myusage) < 0)
{
  printf("getrusage error\n");
  return;
}

if (getrusage(RUSAGE_CHILDREN, &childusage) < 0)
{
  printf("getrusage error\n");
  return;
}
user = (double) myusage.ru_utime.tv_sec + myusage.ru_utime.tv_usec/1000000.0;
user += (double) childusage.ru_utime.tv_sec + childusage.ru_utime.tv_usec/1000000.0;
sys = (double) myusage.ru_stime.tv_sec + myusage.ru_stime.tv_usec/1000000.0;
sys += (double) childusage.ru_stime.tv_sec + childusage.ru_stime.tv_usec/1000000.0;
printf("\nuser time = %g, sys time = %g\n", user, sys);
}
/* end child_make */
/* include child_main */
void child_main(int i, int listenfd, int addrlen)
{
char   c;
int    connfd;
ssize_t   n;
void   web_child(int);
printf("child %ld starting\n", (long) getpid());
for ( ; ; )
{
  if ( (n = read_fd(STDERR_FILENO, &c, 1, &connfd)) == 0)
  {
   printf("read_fd returned 0\n");
   exit(1);
  }
  
  if (connfd < 0)
  {
   printf("no descriptor from read_fd\n");
   exit(1);
  }
  web_child(connfd);    /* process request */
  close(connfd);
  write(STDERR_FILENO, "", 1); /* tell parent we're ready again */
}
}
/* end child_main */
void web_child(int sockfd)
{
int   ntowrite;
ssize_t  nread;
char  line[MAXLINE], result[MAXN];
for ( ; ; )
    {
  if ( (nread = read(sockfd, line, MAXLINE)) == 0)
   return;  /* connection closed by other end */
   /* 4line from client specifies #bytes to write back */
  printf("recieve from client [%s]\n", line);
  //sleep(100);
  //exit(1);
  ntowrite = atol(line);
  
  if ((ntowrite <= 0) || (ntowrite > MAXN))
  {
   printf("client request for %d bytes\n", ntowrite);
   return;
  }
  
  printf("send to client [%s]\n", result);
  writen(sockfd, result, ntowrite);
}
}
ssize_t writen(int fd, const void *vptr, size_t n)
{
size_t  nleft;
ssize_t  nwritten;
const char *ptr;
ptr = vptr;
nleft = n;
while (nleft > 0)
{
  if ( (nwritten = write(fd, ptr, nleft)) <= 0)
  {
   if (nwritten < 0 && errno == EINTR)
    nwritten = 0;  /* and call write() again */
   else
    return(-1);   /* error */
  }
  nleft -= nwritten;
  ptr   += nwritten;
}
return(n);
}
/* end writen */
long * meter(int nchildren)
{
int  fd;
long *ptr;
#ifdef MAP_ANON
    //printf("MAP_ANON defined\n");
ptr = mmap(0, nchildren*sizeof(long), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
#else
fd = open("/dev/zero", O_RDWR, 0);
ptr = mmap(0, nchildren*sizeof(long), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);
#endif
return(ptr);
}
void my_lock_init(char *pathname)
{
int  fd;
pthread_mutexattr_t mattr;
fd = open("/dev/zero", O_RDWR, 0);
mptr = mmap(0, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(mptr, &mattr);
}
/* end my_lock_init */
/* include my_lock_wait */
void my_lock_wait()
{
pthread_mutex_lock(mptr);
}
void my_lock_release()
{
pthread_mutex_unlock(mptr);
}
/* end my_lock_wait */
ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
{
struct msghdr msg;
struct iovec iov[1];
ssize_t   n;
#ifdef HAVE_MSGHDR_MSG_CONTROL
union {
   struct cmsghdr cm;
   char    control[CMSG_SPACE(sizeof(int))];
} control_un;
struct cmsghdr *cmptr;
msg.msg_control = control_un.control;
msg.msg_controllen = sizeof(control_un.control);
#else
int    newfd;
msg.msg_accrights = (caddr_t) &newfd;
msg.msg_accrightslen = sizeof(int);
#endif
msg.msg_name = NULL;
msg.msg_namelen = 0;
iov[0].iov_base = ptr;
iov[0].iov_len = nbytes;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
if ( (n = recvmsg(fd, &msg, 0)) <= 0)
  return(n);
#ifdef HAVE_MSGHDR_MSG_CONTROL
if ( (cmptr = CMSG_FIRSTHDR(&msg)) != NULL &&
     cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
  if (cmptr->cmsg_level != SOL_SOCKET)
  {
   printf("control level != SOL_SOCKET\n");
   exit(1);
  }
  if (cmptr->cmsg_type != SCM_RIGHTS)
  {
   printf("control type != SCM_RIGHTS\n");
   exit(1);
  }
  *recvfd = *((int *) CMSG_DATA(cmptr));
} else
  *recvfd = -1;  /* descriptor was not passed */
#else
/* *INDENT-OFF* */
if (msg.msg_accrightslen == sizeof(int))
  *recvfd = newfd;
else
  *recvfd = -1;  /* descriptor was not passed */
/* *INDENT-ON* */
#endif
return(n);
}
ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
{
struct msghdr msg;
struct iovec iov[1];
#ifdef HAVE_MSGHDR_MSG_CONTROL
union {
   struct cmsghdr cm;
   char    control[CMSG_SPACE(sizeof(int))];
} control_un;
struct cmsghdr *cmptr;
msg.msg_control = control_un.control;
msg.msg_controllen = sizeof(control_un.control);
cmptr = CMSG_FIRSTHDR(&msg);
cmptr->cmsg_len = CMSG_LEN(sizeof(int));
cmptr->cmsg_level = SOL_SOCKET;
cmptr->cmsg_type = SCM_RIGHTS;
*((int *) CMSG_DATA(cmptr)) = sendfd;
#else
msg.msg_accrights = (caddr_t) &sendfd;
msg.msg_accrightslen = sizeof(int);
#endif
msg.msg_name = NULL;
msg.msg_namelen = 0;
iov[0].iov_base = ptr;
iov[0].iov_len = nbytes;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
return(sendmsg(fd, &msg, 0));
}

[ 本帖最后由 c/unix 于 2010-1-27 23:18 编辑 ]

论坛徽章:
1
申猴
日期:2014-02-11 14:50:31
发表于 2010-01-27 23:10 |显示全部楼层
pthread_cond_signal
pthread_cond_wait
...

论坛徽章:
0
发表于 2010-01-28 01:49 |显示全部楼层
原帖由 c/unix 于 2010-1-27 22:58 发表


所有进程/线程模型作者都给了源码。
这个是我以前学习整理过的,可以直接编译使用,模型是主线程统一accept。

/*
*tcp并发服务器程序,主线程统一accept
*/
#include
#include
#include
#in ...

大哥,你这个代码都编译不过的。
我帮你改下,顺便调整下格式
长度超过,附件提供吧,把后缀rar改成.c就可以了

[ 本帖最后由 sithui 于 2010-1-28 01:56 编辑 ]

mthread.rar

9.74 KB, 下载次数: 74

论坛徽章:
0
发表于 2010-01-28 09:29 |显示全部楼层
可以这样,主线程与其它线程通过管道通信,accept后把该socket放入某个地方,然后向管道中写入一个字节,其它线程select或则poll该管道和他所监视的其它socket

论坛徽章:
0
发表于 2010-02-01 16:18 |显示全部楼层
两种方法吧,一种是Leader/Follower模型,一种是Half Sync/Half Async模型。

Leader/Follower设置一个Queue,IO Thread向Queue中push,Worker Thread从Queue中pop。
pop中,如果没有item,就pthread_cond_wait,push的时候调用pthread_cond_signal。

Half Sync/Half Async模型,为每一个Worker Thread设置一个Queue,作为一个Queue Layer,worker thread不断的从里面取然后处理。
这里面的Push和Pop有很多种实现,建议使用pipe通知的方式,worker thread中有一个event_loop,相当与一个Sync Layer。push的时候写pipe的写端,pipe读端事件被检测出来,调用push。

论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
发表于 2010-02-01 16:41 |显示全部楼层
把分给 8楼的 !!
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

SACC2019中国系统架构师大会

【数字转型 架构演进】SACC2019中国系统架构师大会,8.5折限时优惠重磅来袭!
2019年10月31日~11月2日第11届中国系统架构师大会(SACC2019)将在北京隆重召开。四大主线并行的演讲模式,1个主会场、20个技术专场、超千人参与的会议规模,100+来自互联网、金融、制造业、电商等领域的嘉宾阵容,将为广大参会者提供一场最具价值的技术交流盛会。

限时8.5折扣期:2019年9月30日前


----------------------------------------

大会官网>>
  

北京盛拓优讯信息技术有限公司. 版权所有 16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122
中国互联网协会会员  联系我们:huangweiwei@it168.com
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP