- 论坛徽章:
- 0
|
本帖最后由 wangyang917 于 2010-10-18 17:34 编辑
大家好,
我新写了一个epoll LT模式的服务例子 已经经过测试了 服务端平均一秒可以处理1666条数据 我测试的很简单 6个测试独立进程每个进程for循环10万次 一共60万数据。
请有诚意的同学全看一遍 对于有问题的地方请回帖告诉我 对于ET模式说实话我用的不多而且处理细节也太多了 不怎么用。而且我用的机器都编译过内核把句柄1024都改大了。
服务端LT模式代码如下:
/*
如果你发现有connect(): Cannot assign requested address 这样的错误是因为有TIME_WAIT造成句柄再分配错误,你可以这样
#vi /etc/sysctl.conf
输入
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
#sysctl -p
其中:
net.ipv4.tcp_tw_reuse = 1 表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭;
net.ipv4.tcp_tw_recycle = 1 表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭。
把那两行加上去 运行sysctl -p 生效 之后就好多了
对于EPOLLRDHUP编译问题请看http://www.linuxdiyf.com/viewarticle.php?id=97280里面的说明 不需要编译内核只要编译自己程序就行
强调一下 LT模式下用了EPOLLONESHOT 对于短连接来说无所谓 长连接的时候需要对句柄再次EPOLLONESHOT 我代码里有写 长短连接都测试了。
*/
#include <sys/io.h>
#include <stdio.h>
#include <pthread.h>
#include <stdarg.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <netinet/in.h>
#include <syslog.h>
#include <dirent.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <signal.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <stdarg.h>
#include <sys/timeb.h>
#include <sys/types.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <signal.h>
#include <sys/timeb.h>
#include <errno.h>
#include <sys/file.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <queue>
#include <vector>
#include <map>
#include <string>
#include <set>
#include <list>
#include <iostream>
#include <netinet/tcp.h>
using namespace std;
#define MAXLINE 10
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 11111
#define INFTIM 1000
pthread_mutex_t m_ClientAcceptMutex = PTHREAD_MUTEX_INITIALIZER;
list<int> m_ClientList;
#define MAX_P_NUM 10
typedef struct pl {
pthread_t pid;
} pl;
pl pool[MAX_P_NUM];
int epfd = 0;
ssize_t socket_send(int sockfd, const char* buffer, size_t buflen)
{
ssize_t tmp;
size_t total = buflen;
const char *p = buffer;
int retry=2;
while(retry--)
{
tmp = send(sockfd, p, total, 0);
if(tmp < 0)
{
if(errno == EINTR) return -1;
if(errno == EAGAIN)
{
usleep(1000);
continue;
}
return -1;
}
if((size_t)tmp == total) return buflen;
total -= tmp;
p += tmp;
}
return tmp;
}
int setnonblocking(int sock)
{
long arg;
if (fcntl (sock, F_GETFL, &arg) == -1)
{
return -1;
}
arg = arg | O_NONBLOCK;
if (fcntl (sock, F_SETFL, arg) == -1)
{
return -1;
}
return 0;
}
void *SendMsg(void* arg)
{
char line[MAXLINE];
for(;
{
int sockfd = 0;
pthread_mutex_lock(&m_ClientAcceptMutex);
if(m_ClientList.empty())
{
pthread_mutex_unlock(&m_ClientAcceptMutex);
usleep(1000);
continue;
}
sockfd = m_ClientList.front();
m_ClientList.pop_front();
pthread_mutex_unlock(&m_ClientAcceptMutex);
int rs = 1;
int sign = 1;
memset(line,0,MAXLINE);
if(sockfd < 0)
{
printf("sockfd continue\n" ;
continue;
}
int length = 0;
int n = 0;
while(rs)
{
n = read(sockfd,&line[length],10);
printf("%d read buff %d\n",n,pthread_self());
if(n > 0) length = length + n;
if(n < 0)
{
if(errno == EAGAIN)
{
sign = 1;
printf("recv over%d %d\n",errno,pthread_self());
break;
}else if(errno == ECONNRESET)
{
sign = 0;
printf("connect close%d %d\n",errno,pthread_self());
}
} else if(n == 0)
{
sign = 0;
printf("nomal close %d %d\n",errno,pthread_self());
}
if(n == 10)
{
rs = 1;
} else {
rs = 0;
}
}
if(sign == 1 && line[0] != '\0')
{
line[length] = '\0';
printf("read:%s %d\n",line,pthread_self());
printf("length:%d %d\n",length,pthread_self());
char sendbuff[10] = {0};
if(strcmp(line,"12345abcde" == 0)
{
strcpy(sendbuff,"wangyang" ;
} else if(strcmp(line,"12345QWERT" == 0)
{
strcpy(sendbuff,"mayanaaa" ;
}else if(strcmp(line,"98765abcde" == 0)
{
strcpy(sendbuff,"woaimayan" ;
}else if(strcmp(line,"ASDFGabcde" == 0)
{
strcpy(sendbuff,"lkjhgf" ;
}
if(socket_send(sockfd,sendbuff,sizeof(sendbuff)) < 0)
{
printf("发送失败 %d\n",pthread_self());
} else {
printf("send ok!! %d\n",pthread_self());
}
//被注释的while打开则server为短连接,如果注释掉while打开struct epoll_event五行则为长连接
//while (close(sockfd) < 0 && errno == EINTR) {};
struct epoll_event ev;
ev.data.u64 = 0ULL;
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLONESHOT | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
}
else {
while (close(sockfd) < 0 && errno == EINTR) {};
}
}
}
int main(int argc,char **argv)
{
int i,listenfd, connfd,nfds;
ssize_t n;
int optval=1;
struct epoll_event ev,events[10000];
epfd = epoll_create(10000);
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if(setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&optval, sizeof(optval)) == -1)
{
printf("setsockopt error\n");
exit(1);
}
struct linger ln;
ln.l_onoff = 1;
ln.l_linger = 3;
if(setsockopt(listenfd, SOL_SOCKET, SO_LINGER, (void *) &ln,sizeof (struct linger)) < 0)
{
printf("setlinger error\n");
exit(1);
}
if(setnonblocking(listenfd))
{
printf("setnonblocking listenfd\n");
exit(1);
}
ev.data.fd = listenfd;
ev.events = EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
memset(&serveraddr,0,sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
char *local_addr="192.168。3.21";
inet_aton(local_addr,&(serveraddr.sin_addr));
serveraddr.sin_port = htons(SERV_PORT);
bind(listenfd,(struct sockaddr *)&serveraddr,sizeof(serveraddr));
listen(listenfd, LISTENQ);
int on = 1;
setsockopt(listenfd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
for (int loops = 0; loops < MAX_P_NUM; loops++)
{
if (pthread_create(&pool[loops].pid,NULL, SendMsg,NULL) == 0)
{
printf(" thread %d create ok\n", loops);
}
}
for(;;)
{
nfds = epoll_wait(epfd,events,10000,-1);
for(i = 0; i < nfds ; ++i)
{
if(events.data.fd == listenfd)
{
printf("LISTENFD\n");
socklen_t clilen= sizeof(struct sockaddr);
struct sockaddr_in clientaddr;
int sign = 1;
while((connfd = accept(listenfd,(struct sockaddr *)&clientaddr,&clilen)) < 0)
{
if(errno == EINTR)
continue;
if(errno != EAGAIN && errno != EWOULDBLOCK)
{
printf("errno != EAGAIN\n");
sign = 0;
break;
}
}
printf("%d END LISTENFD\n",connfd);
if(sign == 1)
{
if(setnonblocking(connfd))
{
printf("setnonblocking connfd\n");
}
struct epoll_event ev;
char *str = inet_ntoa(clientaddr.sin_addr);
printf("accapt a connection from:%s\n",str);
ev.data.u64 = 0UL;
ev.data.fd = connfd;
ev.events = EPOLLIN | EPOLLONESHOT | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
}
} else if(events.events & EPOLLRDHUP)
{
printf("sockt closed by peer EPOLLRDHUP\n");
while (close(events.data.fd) < 0 && errno == EINTR) {};
} else if(events.events & EPOLLHUP)
{
printf("sockt closed by peer EPOLLHUP\n");
while (close(events.data.fd) < 0 && errno == EINTR) {};
} else if(events.events & EPOLLERR)
{
printf("sockt closed by peer EPOLLERR\n");
epoll_ctl(epfd,EPOLL_CTL_DEL,events.data.fd,&ev);
} else if(events.events & EPOLLIN)
{
printf("EPOLLIN\n");
pthread_mutex_lock(&m_ClientAcceptMutex);
m_ClientList.push_back(events.data.fd);
pthread_mutex_unlock(&m_ClientAcceptMutex);
} else {
while (close(events.data.fd) < 0 && errno == EINTR) {};
}
}
}
close(epfd);
for (int loops = 0; loops < MAX_P_NUM; loops++)
{
pthread_join(pool[loops].pid, NULL);
}
return 0;
}
客户端6个 代码一样 只是发送的10个字节不一样而已(强调一下客户端的发送和接收用到的safe_recv和safe_send函数是我自己编写的,有想测试的同学要把这两处改成你们自己的,那两个函数源码我就不写了,就是发送和接收,相信你们都会,由于比较懒为了省力气我就复制了6个相同进程,没有用多线程大量并发测。个人估算了一下 服务端一秒处理1666个链接请求,有感兴趣的同学可以写个多线程客户端并发测试一下。)
#include <signal.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/timeb.h>
#include <sys/param.h>
#include <sys/ioctl.h>
#include <sys/file.h>
#include <netinet/in.h>
#include <net/if.h>
#include <net/if_arp.h>
#include <arpa/inet.h>
#include <errno.h>
#include <dirent.h>
#include <ctype.h>
#include <time.h>
using namespace wbl;
unsigned long GetTickCount()
{
unsigned long dCount = 0;
struct timeval tp;
gettimeofday(&tp, NULL);
struct tm *ptm = localtime(&tp.tv_sec);
dCount =ptm->tm_hour*60*60*1000 + ptm->tm_min*60*1000 + ptm->tm_sec*1000 + tp.tv_usec/1000;
return dCount;
}
int main()
{
int sd = 0;
const char AGENTADDR[20] = "10.65.5.77";
const char AGENTPORT[20] = "11111";
char str[10] = {0};
char qtr[10] = {0};
strcpy(str,"12345abcde");
long i = 0;
for(int s = 0; s < 100000 ; s++)
{
if((i % 1000) == 0)
{
sleep(1);
}
while((sd = connect_to(AGENTADDR, atoi(AGENTPORT))) == -1)
{
printf("Can't connect to DXBY[%s][%s]\n", AGENTADDR,AGENTPORT);
sleep(1);
}
int nlodtick = GetTickCount();
if(safe_send(sd, str, sizeof(str), 2) == sizeof(str))
{
printf("send is ok %d\n",s);
int p = 0;
if((p = safe_recv(sd, (char*)qtr, sizeof(qtr), 2)) == sizeof(qtr))
{
printf("recvmsg[%s]\n",qtr);
} else {
printf("recv fail %d\n",p);
}
}
shutdown(sd, 2);
close(sd);
int nnewtick = GetTickCount();
printf("%d tick:%d\n",s,nnewtick-nlodtick);
i++;
}
return 0;
}
Makefile文件如下
CC=g++
CFLAGS=-Wall -g -O2
INCLUDE=-I ./
SOURCE=$(wildcard *.c)
all:
$(CC) -g -o epollserver $(INCLUDE) $(SOURCE) -lpthread
clean:
rm epollserver |
|