免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
12345下一页
最近访问板块 发新帖
查看: 30797 | 回复: 41
打印 上一主题 下一主题

EPOLL LT模式的服务模型已经经过测试了 大家可以学习一下 欢迎指导 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2010-10-05 17:43 |只看该作者 |倒序浏览
本帖最后由 wangyang917 于 2010-10-18 17:34 编辑

大家好,
我新写了一个epoll LT模式的服务例子 已经经过测试了 服务端平均一秒可以处理1666条数据  我测试的很简单  6个测试独立进程每个进程for循环10万次 一共60万数据。

请有诚意的同学全看一遍  对于有问题的地方请回帖告诉我  对于ET模式说实话我用的不多而且处理细节也太多了 不怎么用。而且我用的机器都编译过内核把句柄1024都改大了。

服务端LT模式代码如下:

/*
如果你发现有connect(): Cannot assign requested address 这样的错误是因为有TIME_WAIT造成句柄再分配错误,你可以这样
#vi /etc/sysctl.conf
输入
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1

#sysctl -p
其中:
net.ipv4.tcp_tw_reuse = 1 表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭;
net.ipv4.tcp_tw_recycle = 1 表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭。
把那两行加上去 运行sysctl -p 生效 之后就好多了


对于EPOLLRDHUP编译问题请看http://www.linuxdiyf.com/viewarticle.php?id=97280里面的说明 不需要编译内核只要编译自己程序就行

强调一下 LT模式下用了EPOLLONESHOT  对于短连接来说无所谓 长连接的时候需要对句柄再次EPOLLONESHOT   我代码里有写  长短连接都测试了。
*/
#include <sys/io.h>
#include <stdio.h>
#include <pthread.h>
#include <stdarg.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <netinet/in.h>
#include <syslog.h>
#include <dirent.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <signal.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <stdarg.h>
#include <sys/timeb.h>
#include <sys/types.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <signal.h>
#include <sys/timeb.h>
#include <errno.h>
#include <sys/file.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <queue>
#include <vector>
#include <map>
#include <string>
#include <set>
#include <list>
#include <iostream>
#include <netinet/tcp.h>

using namespace std;
#define MAXLINE 10
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 11111
#define INFTIM 1000
pthread_mutex_t m_ClientAcceptMutex = PTHREAD_MUTEX_INITIALIZER;
list<int> m_ClientList;
#define MAX_P_NUM 10
typedef struct pl {
        pthread_t pid;
} pl;
pl pool[MAX_P_NUM];
int epfd = 0;
ssize_t socket_send(int sockfd, const char* buffer, size_t buflen)
{
        ssize_t tmp;
        size_t total = buflen;
        const char *p = buffer;
        int retry=2;
        while(retry--)
        {
                tmp = send(sockfd, p, total, 0);
                if(tmp < 0)
                {
                        if(errno == EINTR) return -1;
                        if(errno == EAGAIN)
                        {
                                usleep(1000);
                                continue;
                        }
                        return -1;
                }
                if((size_t)tmp == total) return buflen;
                total -= tmp;
                p += tmp;
        }
        return tmp;
}

int setnonblocking(int sock)
{
        long arg;

        if (fcntl (sock, F_GETFL, &arg) == -1)
        {
                return -1;
        }
        arg = arg | O_NONBLOCK;
        if (fcntl (sock, F_SETFL, arg) == -1)
        {
                return -1;
        }
        return 0;
}

void  *SendMsg(void* arg)
{
        char line[MAXLINE];
        for(;
        {
                int sockfd = 0;
                pthread_mutex_lock(&m_ClientAcceptMutex);
                if(m_ClientList.empty())
                {
                        pthread_mutex_unlock(&m_ClientAcceptMutex);
                        usleep(1000);
                        continue;
                }
                sockfd = m_ClientList.front();
                m_ClientList.pop_front();
                pthread_mutex_unlock(&m_ClientAcceptMutex);
                int rs = 1;
                int sign = 1;
                memset(line,0,MAXLINE);
                if(sockfd < 0)
                {
                        printf("sockfd continue\n";
                        continue;
                }
                int length = 0;
                int n = 0;
                while(rs)
                {
                        n = read(sockfd,&line[length],10);
                        printf("%d read buff %d\n",n,pthread_self());
                        if(n > 0) length = length + n;

                        if(n < 0)
                        {
                                if(errno == EAGAIN)
                                {
                                        sign = 1;
                                        printf("recv over%d %d\n",errno,pthread_self());
                                        break;
                                }else if(errno == ECONNRESET)
                                {
                                        sign = 0;
                                        printf("connect close%d %d\n",errno,pthread_self());
                                }
                        } else if(n == 0)
                        {
                                sign = 0;
                                printf("nomal close %d %d\n",errno,pthread_self());
                        }
                        if(n == 10)
                        {
                                rs = 1;
                        } else {
                                rs = 0;
                        }
                }
                if(sign == 1 && line[0] != '\0')
                {
                        line[length] = '\0';
                        printf("read:%s %d\n",line,pthread_self());
                        printf("length:%d %d\n",length,pthread_self());
                        char sendbuff[10] = {0};
                        if(strcmp(line,"12345abcde" == 0)
                        {
                                strcpy(sendbuff,"wangyang";
                        } else if(strcmp(line,"12345QWERT" == 0)
                        {
                                strcpy(sendbuff,"mayanaaa";
                        }else if(strcmp(line,"98765abcde" == 0)
                        {
                                strcpy(sendbuff,"woaimayan";
                        }else if(strcmp(line,"ASDFGabcde" == 0)
                        {
                                strcpy(sendbuff,"lkjhgf";
                        }
                        if(socket_send(sockfd,sendbuff,sizeof(sendbuff)) < 0)
                        {
                                printf("发送失败 %d\n",pthread_self());
                        } else {
                                printf("send ok!! %d\n",pthread_self());
                        }
                        //被注释的while打开则server为短连接,如果注释掉while打开struct epoll_event五行则为长连接
                        //while (close(sockfd) < 0 && errno == EINTR) {};
                               
                        struct epoll_event ev;
                        ev.data.u64 = 0ULL;
                        ev.data.fd = sockfd;
                        ev.events = EPOLLIN | EPOLLONESHOT | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
                        epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
                       
                }
                else {
                        while (close(sockfd) < 0 && errno == EINTR) {};
                }
        }
}

int main(int argc,char **argv)
{
        int i,listenfd, connfd,nfds;
        ssize_t n;
        int         optval=1;
        struct epoll_event ev,events[10000];
        epfd = epoll_create(10000);
        struct sockaddr_in serveraddr;
        listenfd = socket(AF_INET,SOCK_STREAM,0);
        if(setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&optval, sizeof(optval)) == -1)
        {
                printf("setsockopt error\n");
                exit(1);
        }
        struct linger ln;
        ln.l_onoff = 1;
        ln.l_linger = 3;
        if(setsockopt(listenfd, SOL_SOCKET, SO_LINGER, (void *) &ln,sizeof (struct linger)) < 0)
        {
                printf("setlinger error\n");
                exit(1);
        }
        if(setnonblocking(listenfd))
        {
                printf("setnonblocking listenfd\n");
                exit(1);
        }
        ev.data.fd = listenfd;
        ev.events =  EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
        epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
        memset(&serveraddr,0,sizeof(serveraddr));
        serveraddr.sin_family = AF_INET;
        char *local_addr="192.168。3.21";
        inet_aton(local_addr,&(serveraddr.sin_addr));
        serveraddr.sin_port = htons(SERV_PORT);
        bind(listenfd,(struct sockaddr *)&serveraddr,sizeof(serveraddr));
        listen(listenfd, LISTENQ);
                int on = 1;
                setsockopt(listenfd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
        for (int loops = 0; loops < MAX_P_NUM; loops++)
        {
                if (pthread_create(&pool[loops].pid,NULL, SendMsg,NULL) == 0)
                {
                        printf("thread %d create ok\n", loops);
                }
        }
        for(;;)
        {
                nfds = epoll_wait(epfd,events,10000,-1);
                for(i = 0; i < nfds ; ++i)
                {
                        if(events.data.fd == listenfd)
                        {
                                printf("LISTENFD\n");
                                socklen_t clilen= sizeof(struct sockaddr);
                                struct sockaddr_in clientaddr;
                                int sign = 1;
                                while((connfd = accept(listenfd,(struct sockaddr *)&clientaddr,&clilen)) < 0)
                                {
                                        if(errno == EINTR)
                                                continue;
                                        if(errno != EAGAIN && errno != EWOULDBLOCK)
                                        {
                                                printf("errno != EAGAIN\n");
                                                sign = 0;
                                                break;
                                        }
                                }
                                printf("%d END LISTENFD\n",connfd);
                                if(sign == 1)
                                {
                                        if(setnonblocking(connfd))
                                        {
                                                printf("setnonblocking connfd\n");
                                        }
                                        struct epoll_event ev;
                                        char *str = inet_ntoa(clientaddr.sin_addr);
                                        printf("accapt a connection from:%s\n",str);
                                        ev.data.u64 = 0UL;
                                        ev.data.fd = connfd;
                                        ev.events = EPOLLIN | EPOLLONESHOT | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
                                        epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
                                }
                        } else if(events.events & EPOLLRDHUP)
                        {
                                printf("sockt closed by peer EPOLLRDHUP\n");
                                while (close(events.data.fd) < 0 && errno == EINTR) {};
                        } else if(events.events & EPOLLHUP)
                        {
                                printf("sockt closed by peer EPOLLHUP\n");
                                while (close(events.data.fd) < 0 && errno == EINTR) {};
                        } else if(events.events & EPOLLERR)
                        {
                                printf("sockt closed by peer EPOLLERR\n");
                                epoll_ctl(epfd,EPOLL_CTL_DEL,events.data.fd,&ev);
                        } else if(events.events & EPOLLIN)
                        {
                                printf("EPOLLIN\n");
                                pthread_mutex_lock(&m_ClientAcceptMutex);
                                m_ClientList.push_back(events.data.fd);
                                pthread_mutex_unlock(&m_ClientAcceptMutex);
                        } else {
                                while (close(events.data.fd) < 0 && errno == EINTR) {};
                        }

                }
        }
        close(epfd);
        for (int loops = 0; loops < MAX_P_NUM; loops++)
        {
                pthread_join(pool[loops].pid, NULL);
        }
        return 0;
}


客户端6个 代码一样 只是发送的10个字节不一样而已(强调一下客户端的发送和接收用到的safe_recv和safe_send函数是我自己编写的,有想测试的同学要把这两处改成你们自己的,那两个函数源码我就不写了,就是发送和接收,相信你们都会,由于比较懒为了省力气我就复制了6个相同进程,没有用多线程大量并发测。个人估算了一下 服务端一秒处理1666个链接请求,有感兴趣的同学可以写个多线程客户端并发测试一下。)


#include <signal.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/timeb.h>
#include <sys/param.h>
#include <sys/ioctl.h>
#include <sys/file.h>
#include <netinet/in.h>
#include <net/if.h>
#include <net/if_arp.h>
#include <arpa/inet.h>
#include <errno.h>
#include <dirent.h>
#include <ctype.h>
#include <time.h>
using namespace wbl;

unsigned long GetTickCount()
{      
        unsigned long dCount = 0;
        struct timeval tp;
        gettimeofday(&tp, NULL);
        struct tm *ptm = localtime(&tp.tv_sec);
               
        dCount =ptm->tm_hour*60*60*1000 + ptm->tm_min*60*1000 + ptm->tm_sec*1000 + tp.tv_usec/1000;
               
        return dCount;
}
int main()
{
        int sd = 0;
        const char AGENTADDR[20] = "10.65.5.77";
        const char AGENTPORT[20] = "11111";
        char str[10] = {0};
        char qtr[10] = {0};
        strcpy(str,"12345abcde");
        long i = 0;
        for(int s = 0; s < 100000 ; s++)
        {
                if((i % 1000) == 0)
                {
                        sleep(1);
                }
                while((sd = connect_to(AGENTADDR, atoi(AGENTPORT))) == -1)
                {
                        printf("Can't connect to DXBY[%s][%s]\n", AGENTADDR,AGENTPORT);
                        sleep(1);
                }
                int nlodtick = GetTickCount();       
                if(safe_send(sd, str, sizeof(str), 2) ==  sizeof(str))
                {
                        printf("send is ok %d\n",s);
                        int p = 0;
                        if((p = safe_recv(sd, (char*)qtr, sizeof(qtr), 2)) == sizeof(qtr))
                        {
                                printf("recvmsg[%s]\n",qtr);
                        } else {
                                printf("recv fail %d\n",p);
                        }

                }
                shutdown(sd, 2);               
                close(sd);
                int nnewtick = GetTickCount();
                printf("%d tick:%d\n",s,nnewtick-nlodtick);
                i++;
        }
        return 0;
}

Makefile文件如下

CC=g++
CFLAGS=-Wall -g -O2

INCLUDE=-I ./

SOURCE=$(wildcard *.c)
all:
                $(CC) -g -o epollserver $(INCLUDE) $(SOURCE) -lpthread
clean:
        rm epollserver

论坛徽章:
0
2 [报告]
发表于 2010-10-06 17:23 |只看该作者
自己顶自己一下 里面很多东西也是网上的思想 欢迎大家狠狠拍砖

论坛徽章:
0
3 [报告]
发表于 2010-10-06 22:24 |只看该作者
no timeout mechanism?

论坛徽章:
2
2015年辞旧岁徽章
日期:2015-03-03 16:54:152015年迎新春徽章
日期:2015-03-04 09:56:11
4 [报告]
发表于 2010-10-06 22:54 |只看该作者
timeout是很重要的,否则被DOS攻击就麻烦了

论坛徽章:
0
5 [报告]
发表于 2010-10-06 23:48 |只看该作者
最简单的方式是开个线程轮询检测超时。

论坛徽章:
0
6 [报告]
发表于 2010-10-07 11:09 |只看该作者
我这个只是一个例子,说明一下主体架构,timeout的问题我这边有两种方法  一种就是对socket有一个数据结构 里面记录每一次时间 在epollwait 之前先轮询一遍检查超时  或者单起线程也行。
另一种就是send和recv用select做超时判断   因为我机器是被重新改过句柄数的 所以接收是epoll  检测超时还是用select

论坛徽章:
1
巨蟹座
日期:2013-12-30 17:06:34
7 [报告]
发表于 2010-10-07 22:07 |只看该作者
mark.

论坛徽章:
0
8 [报告]
发表于 2010-10-11 15:54 |只看该作者
顶一下

论坛徽章:
0
9 [报告]
发表于 2010-10-17 17:09 |只看该作者
mark

论坛徽章:
26
处女座
日期:2016-04-18 14:00:4515-16赛季CBA联赛之深圳
日期:2020-06-02 10:10:5015-16赛季CBA联赛之广夏
日期:2019-07-23 16:59:452016科比退役纪念章
日期:2019-06-26 16:59:1315-16赛季CBA联赛之天津
日期:2019-05-28 14:25:1915-16赛季CBA联赛之青岛
日期:2019-05-16 10:14:082016科比退役纪念章
日期:2019-01-11 14:44:062016科比退役纪念章
日期:2018-07-18 16:17:4015-16赛季CBA联赛之上海
日期:2017-08-22 18:18:5515-16赛季CBA联赛之江苏
日期:2017-08-04 17:00:4715-16赛季CBA联赛之佛山
日期:2017-02-20 18:21:1315-16赛季CBA联赛之天津
日期:2016-12-12 10:44:23
10 [报告]
发表于 2010-10-17 21:42 |只看该作者
  楼主 希望你能用CODE 标签标注一下 另外提供一下打包文件 ,谢谢 ~
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP