- 论坛徽章:
- 0
|
本帖最后由 bio_tt 于 2013-02-20 17:14 编辑
在多处理器或者一些情况下,pthread_cond_signal可以唤醒多个线程。
所以将if(all == NULL)修改为while(all == NULL)
最近打算练习一下epoll,所以写了下面一个程序。
结构就是:
主线程开了2个工作线程当线程池使用,主线程监听7788端口,有新的连接就放人epoll中,当新连接可读时,工作线程就可以读内容。
我写了一个客户端测试,当连接多(20以上)的时候,经常就会报错,好像是内存访问问题。出错的位置,我用红色标出来了
但是如果我只开一个工作线程,毫无问题。求大侠帮我看看。
#include "stdio.h"
#include "string.h"
#include "stdlib.h"
#include "sys/socket.h"
#include "netinet/in.h"
#include "sys/epoll.h"
#include "fcntl.h"
#include "sys/ioctl.h"
#include "errno.h"
#include "pthread.h"
struct data{
int fd;
struct data *next;
};
struct data *all;
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t flag=PTHREAD_COND_INITIALIZER;
void setnonblocking(int sock)
{
if(fcntl(sock,F_SETFL,O_NONBLOCK)<0)
{
printf("fcntl set error1\n" ;exit(1);
}
}
void do_work()
{
while(1)
{
pthread_mutex_lock(&lock);
if(all == NULL)
{
printf("thread is waiting...\n" ;
pthread_cond_wait(&flag,&lock);
printf("%d thread got data...\n",pthread_self());
}
struct data *tmp;
tmp=all;
all=all->next;
pthread_mutex_unlock(&lock);
int sockfd=tmp->fd;
free(tmp);
//read data......
char last[5000],buf[10];
int len=0;
int n,read_num;
ioctl(sockfd,FIONREAD,&read_num);
printf("we will get %d char\n",read_num);
while(len < read_num)
{
if((n=read(sockfd,buf,10))<0)
{
if(errno == ECONNRESET)
{
close(sockfd);
break;
}
else if(errno == EAGAIN){
break;
}
}
else if(n == 0)
{
break;
}
buf[n]='\0';
len=len+n;
strcat(last,buf);
}
if(write(sockfd,last,len) <0)
{
printf("write error\n" ;close(sockfd);
}
close(sockfd);
strcpy(last,"" ;
}
}
void main(int argc,char *argv[])
{
int len,nfds,opts,epfd,listenfd,client,sockfd;
struct sockaddr_in server_addr,client_addr;
struct epoll_event ev,events[20];
listenfd=socket(AF_INET,SOCK_STREAM,0);
setnonblocking(listenfd);
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(787 ;
server_addr.sin_addr.s_addr=inet_addr("127.0.0.1" ;
setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,1,sizeof(int));
bind(listenfd,(struct sockaddr *)&server_addr,sizeof(server_addr));
listen(listenfd,20);
epfd=epoll_create(256);
ev.data.fd=listenfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
int i,maxi=0;
pthread_t go[2];
pthread_create(&go[0],NULL,do_work,NULL);
pthread_detach(go[0]);
pthread_create(&go[1],NULL,do_work,NULL);
pthread_detach(go[1]);
for(;
{
nfds=epoll_wait(epfd,events,20,500);
for(i=0;i<nfds;i++)
{
if(events.data.fd == listenfd)
{
socklen_t client_size=sizeof(client_addr);
client=accept(listenfd,(struct sockaddr *)&client_addr,&client_size);
if(client <0)
{
printf("client accept error\n" ;exit(1);
}
setnonblocking(client);
ev.data.fd=client;
ev.events=EPOLLIN|EPOLLERR|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,client,&ev);
}
else{
if(events.events == EPOLLERR)
{
int readsock=events.data.fd;
ev.data.fd=readsock;
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_DEL,readsock,&ev);
close(readsock);
}
if(events.events == EPOLLIN)
{
int readsock=events.data.fd;
ev.data.fd=readsock;
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_DEL,readsock,&ev); //change mode from read to write
struct data *main1;
main1=(struct data *)malloc(sizeof(struct data));
main1->fd=readsock;
main1->next=NULL;
pthread_mutex_lock(&lock);
if(all == NULL)
all=main1;
else{
struct data *tmp1;
tmp1=all;
while(tmp1->next !=NULL)
tmp1=tmp1->next;
tmp1->next=main1;
}
pthread_cond_signal(&flag);
pthread_mutex_unlock(&lock);
}
}
}
}
}
|
|