免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 804 | 回复: 0
打印 上一主题 下一主题

一个高效的服务器 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2008-11-16 23:49 |只看该作者 |倒序浏览
一个高效的服务器
高效服务器的关键技术:
1.需要有固定数量的线程来去处理请求,而不可以每次收到请求都fork或者create_thread;
2.如果socket使用堵塞模式,那么read必须要有超时,如果使用非堵塞模式,那么read就必须要有数据缓冲;
3.使用epoll分别在不同的线程中监视可读和可写;
4.对于tcp长连接使用主线程分配连接,然后由任务线程处理,一个任务线程对应一个任务队列;对于tcp短连接和udp连接,使用竞争线程;
5.时刻检测并且关闭超时连接;
6.真正关闭socket的地方只能由一处,其他地方发现其socket需要关闭只能标识socket应该关闭了,然后由那一处安全的关闭socket;
7.当服务器需要不断的发送硬盘上的数据给客户端(比如说文件传输服务器和流媒体服务器),那么服务器端必须先从硬盘加载数据到数据缓存,再从数据缓冲发送数据给客户端;
8.内存分配必须成功,不然线程睡眠然后重新分配直到分配成功为止;
通用设计方案:
现在我们设计一个通用的服务器,这个服务器接受客户端的连接,然后不断读取客户端的请求,在某一时刻又转为不断的发送数据给客户端;下面我们看看怎么设计这样的一个服务器
注:
1.为了只关注设计方面并且让代码尽量简洁,对于一些错误处理我删除了。
2.must_malloc,must_calloc和其他前缀为must的函数封装了相应的内存分配函数,使得分配必须成功;
3.queue_t结构是一个通用的任务队列结构,queue_new是创建新的队列,queue_push是推数据到队列中,queue_pop是从队列中抛出数据
4.下面的代码只是一个良好的服务器架构,而不是一个真正能使用的服务器
5.欢迎讨论和优化!
代码:typedef struct server_st *server_t;
typedef struct task_queue_st *task_queue_t;
typedef struct conn_st *conn_t;
/** 服务器结构 */
struct server_st {
        /** 监听地址 */
        struct in_addr listen_addr;
        /** 监听端口 */
        int port;
        /** 监听描述符 */
        int listen_fd;
        /**负责读取请求的任务队列,一个线程负责处理一个任务队列,一共五个*/
        queue_t readreq_task_queues[5];
        /**负责写数据的任务队列,一个线程负责处理一个任务队列,一共五个*/
        queue_t writedata_task_queues[5];
};
/** 任务队列结构 */
struct task_queue_st {
        /** 待添加的连接 */
        queue_t pending_conn_queue;
        /** 互斥锁,用于多线程环境下对任务队列的访问控制 */
        pthread_mutex_t task_queue_mutex;
        /** 链表结构,保存任务队列中的所有连接 */
        conn_t conn_head;
        /** 链表的最后一个节点,用于方便在结尾插入新的节点 */
        conn_t conn_tail;
        /** 总连接数*/
        int total_conns;
};
/** 用户的阶段状态 */
typedef enum {
        status_ESTABLISHED,/** 客户端刚建立连接 */
        status_CONNECTED,/** 客户端正在连接 */
        status_START_RECV_DATA,/** 客户端开始接收数据,即表示服务器端开始发送数据 */
        status_CLOSED/** 客户端关闭连接 */
} client_status_t;
/** 服务器的阶段状态 */
typedef enum {
        status_SERVER_CONNECTED,/** 与客户端建立连接 */
        status_SERVER_CLOSING /** 服务器关闭连接 */
} server_status_t;
/** 连接结构 */
struct conn_st {
        /** 该连接的所属服务器 */
        server_t server;
        /** 该连接的引用计数 */
        int ref_count;
        /** 用户最后一次活动的时间*/
        time_t last_activity;
        /** 客户端当前的状态阶段*/
        client_status_t client_status;
        /** 服务器当前的状态阶段*/
        server_status_t server_status;
        /** socket描述符*/
        int fd;
  /** 数据缓冲,当服务器端需要不断的发送数据给客户端时被用到 */
        char *remain_data;
        /** 数据缓冲剩余数据大小 */
        int remain_data_size;
        /** 下一个连接 */
        conn_t readreq_task_queue_next;
        /** 下一个连接 */
        conn_t writedata_task_queue_next;
};
/** 主函数 */
int main(int argc, char **argv) {
        int idx, conn_fd;
        server_t server;
        server = must_calloc(1, sizeof(struct server_st));
        server->port = 端口号;
        server->listen_addrs.s_addr = INADDR_ANY;
        server->listen_fd = 服务器监听(&server->listen_addr,server->port);
        设置成非堵塞(server->listen_fd);
        /* 创建5个负责读取请求的任务线程 */
        for (idx = 0; idx readreq_task_queues[idx];
                bzero(task_queue,sizeof(struct task_queue_st));
                task_queue->pending_conn_queue = queue_new();
                pthread_mutex_init(&task_queue->task_queue_mutex,NULL);
                pthread_create(&task_queue->thread_tid, NULL, readreq_task_thread, task_queue);
        }
        /* 创建5个负责写数据的任务线程 */
        for (idx = 0; idx writedata_task_queues[idx];
                bzero(task_queue,sizeof(struct task_queue_st));
                task_queue->pending_conn_queue = queue_new();
                pthread_mutex_init(&task_queue->task_queue_mutex,NULL);
                pthread_create(&task_queue->thread_tid, NULL, writedata_task_thread, task_queue);
        }
        /* 无限循环的监听,等待客户端的连接请求 */
        while (1) {
                conn_fd = accept(server->listen_fd, 0, 0);
                if (conn_fd readreq_task_queues[idx];
                if (min_task_queue == NULL)
                        min_task_queue = task_queue;
                else if (min_task_queue->total_conns > task_queue->total_conns)
                        min_task_queue = task_queue;
        }
        if (min_task_queue) {
                conn = must_calloc(1, sizeof(struct conn_st));
                conn->server = server;
                conn->client_status = status_ESTABLISHED;
                conn->server_status = status_SERVER_CONNECTED;
                conn->ref_count++;/*累加引用计数*/
                /*添加到该任务队列中的待添加队列并且累加该任务队列里的连接数*/
                pthread_mutex_lock(&min_task_queue->task_queue_mutex);
                queue_push(min_task_queue->pending_conn_queue, conn, 1);
                min_task_queue->total_conns++;
                pthread_mutex_unlock(&min_task_queue->task_queue_mutex);
        }
}
/**
*负责读取请求的任务线程处理主函数,负责管理,处理自己队列中的所有连接
*/
static void *readreq_task_thread(void *arg) {
        int epfd = epoll_create(20480);
        struct epoll_event ev,happened_ev[128];
        task_queue_t task_queue = (task_queue_t) arg;
        int ready,i;
        while (1) {
                time_t curtime = time(NULL);
                /*下面三个变量遍历链表时使用,conn是当前连接,prev_conn是先前的连接,discarded_conn是丢弃的连接*/
                conn_t conn = NULL, prev_conn = NULL, discarded_conn;
                /*把待添加队列里的连接添加到任务队列中*/
                pthread_mutex_lock(&task_queue->task_queue_mutex);
                while ((conn = queue_pop(task_queue->pending_conn_queue))) {
                        if (!task_queue->conn_head) {
                                task_queue->conn_head = conn;
                        } else {
                                task_queue->conn_tail->readreq_task_queue_next = conn;
                        }
                        task_queue->conn_tail = conn;
                }
                pthread_mutex_unlock(&task_queue->task_queue_mutex);
                if (task_queue->total_conns == 0) {
                        thread_sleep(1);
                        continue;
                }
                conn = task_queue->conn_head;
                while (conn) {
                        //清理已经关闭的连接,把已经关闭的连接和超时的连接从链表中删除掉
                        boolean conn_timeout = conn->last_activity > 0 && curtime - conn->last_activity > CLIENT_TIMEOUT;
                        if ((conn->client_status == status_CLOSED || conn->server_status == status_SERVER_CLOSING || conn_timeout) && conn->ref_count == 1) {
                                task_queue->total_conns--;
                                discarded_conn = conn;//先放入临时变量中
                                //从链表中删除
                                if (prev_conn) {
                                        if (!conn->readreq_task_queue_next) {
                                                task_queue->conn_tail = prev_conn;
                                                prev_conn->readreq_task_queue_next = NULL;
                                        } else {
                                                prev_conn->readreq_task_queue_next = conn->readreq_task_queue_next;
                                        }
                                } else {
                                        task_queue->conn_head = conn->readreq_task_queue_next;
                                }
                                conn = conn->readreq_task_queue_next;
                                close(discarded_conn->fd);/*真正关闭连接的地方*/
                                free(discarded_conn);
                        } else {
                                ev.data.fd = conn->fd;
                                ev.data.ptr = conn;
                                /*只负责监视读事件和出错事件*/
                                ev.events = EPOLLIN | EPOLLET;
                                if(conn->client_status == status_ESTABLISHED) {
                                        设置超时时间或者设置非堵塞(conn->fd);/*两者必须选一个*/
                                        epoll_ctl(epfd,EPOLL_CTL_ADD,conn->fd,&ev);
                                        conn->client_status = status_CONNECTED;
                                } else {
                                        epoll_ctl(epfd,EPOLL_CTL_MOD,conn->fd,&ev);
                                }
                                prev_conn = conn;
                                conn = conn->readreq_task_queue_next;
                        }
                }
                /** 堵塞等待,超时时间为POLL_TIMEOUT毫秒*/
                if ((ready = epoll_wait(epfd, happened_ev,sizeof(happened_ev) / sizeof(struct epoll_event), POLL_TIMEOUT)) last_activity = time(NULL);/* 记录客户端的最后活动时间*/
                                proc_protocol(conn);/*处理协议函数*/
                        } else if(happened_ev.events & EPOLLHUP) {
                                conn = (conn_t) happened_ev.data.ptr;
                                conn->client_status = status_CLOSED;
                        }
                }
        }
}
/* 协议处理函数 */
void proc_protocol(conn_t conn) {
        某一时刻客户端要求服务器端不断发送数据给客户端,然后设置了conn->client_status = status_START_RECV_DATA,并且调用了下面的move_to_writedata_task_thread(conn);
}
/*分配任务,原则为哪个线程的任务队列最少就分配给哪一个线程去处理*/
static void move_to_writedata_task_thread(conn_t conn) {
        server_t server = conn->server;
        task_queue_t task_queue, min_task_queue= NULL;
        int idx;
        /* 查找连接数最少的任务队列 */
        for (idx = 0; idx writedata_task_queues[idx];
                if (min_task_queue == NULL)
                        min_task_queue = task_queue;
                else if (min_task_queue->total_conns > task_queue->total_conns)
                        min_task_queue = task_queue;
        }
        if (min_task_queue) {
                conn->ref_count++;/*累加引用计数*/
                /*添加到该任务队列中的待添加队列并且累加该任务队列里的连接数*/
                pthread_mutex_lock(&min_task_queue->task_queue_mutex);
                queue_push(min_task_queue->pending_conn_queue, conn, 1);
                min_task_queue->total_conns++;
                pthread_mutex_unlock(&min_task_queue->task_queue_mutex);
        }
}
/**
*负责写数据的线程处理主函数,负责管理,处理自己队列中的所有连接
*/
static void *writedata_task_thread(void *arg) {
        task_queue_t task_queue = (task_queue_t) arg;
        int epfd = epoll_create(20480);
        struct epoll_event ev,happened_ev[128];
        int ready,i,nwrite,nread,sndbuf_size = 10240;
        char buf[sndbuf_size];
        while (1) {
                time_t curtime = time(NULL);
                /*下面三个变量遍历链表时使用,conn是当前连接,prev_conn是先前的连接,discarded_conn是丢弃的连接*/
                conn_t conn = NULL, prev_conn = NULL, discarded_conn;
                /*把待添加队列里的连接添加到任务队列中*/
                pthread_mutex_lock(&task_queue->task_queue_mutex);
                while ((conn = queue_pop(task_queue->pending_conn_queue))) {
                        if (!task_queue->conn_head) {
                                task_queue->conn_head = conn;
                        } else {
                                task_queue->conn_tail->writedata_task_queue_next = conn;
                        }
                        task_queue->conn_tail = conn;
                }
                pthread_mutex_unlock(&task_queue->task_queue_mutex);
                if (task_queue->total_conns == 0) {
                        thread_sleep(1);
                        continue;
                }
                conn = task_queue->conn_head;
                while (conn) {
                        //清理已经关闭的连接,把已经关闭的连接和超时的连接从链表中删除掉
                        boolean conn_timeout = conn->last_activity > 0 && curtime - conn->last_activity > CLIENT_TIMEOUT;
                        if (conn->client_status == status_CLOSED || conn->server_status == status_SERVER_CLOSING || conn_timeout) {
                                task_queue->total_conns--;
                                discarded_conn = conn;//放入临时变量中
                                //从链表中删除
                                if (prev_conn) {
                                        if (!conn->writedata_task_queue_next) {
                                                task_queue->conn_tail = prev_conn;
                                                prev_conn->writedata_task_queue_next = NULL;
                                        } else {
                                                prev_conn->writedata_task_queue_next = conn->writedata_task_queue_next;
                                        }
                                } else {
                                        task_queue->conn_head = conn->writedata_task_queue_next;
                                }
                                conn = conn->writedata_task_queue_next;
                                discarded_conn->ref_count--;/*引用计数减一,并且该连接已经关闭了,那么真正关闭连接的任务就交给readreq_task_thread函数了*/
                        } else {
                                ev.data.fd = conn->fd;
                                ev.data.ptr = conn;
                                /*只负责监视写事件和出错事件*/
                                ev.events = EPOLLOUT | EPOLLET;
                                if(conn->client_status == status_START_RECV_DATA) {
                                        setsockopt(conn->fd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_size, sizeof(int));
                                        epoll_ctl(epfd,EPOLL_CTL_ADD,conn->fd,&ev);
                                        conn->client_status = status_CONNECTED;
                                } else {
                                        epoll_ctl(epfd,EPOLL_CTL_MOD,conn->fd,&ev);
                                }
                                prev_conn = conn;
                                conn = conn->writedata_task_queue_next;
                        }
                }
                /**堵塞等待,超时时间为POLL_TIMEOUT毫秒*/
                if ((ready = epoll_wait(epfd, happened_ev,sizeof(happened_ev) / sizeof(struct epoll_event), POLL_TIMEOUT)) last_activity = time(NULL);/* 记录客户端的最后活动时间*/
                                if(conn->remain_data_size == 0) {/*如果数据缓存已经没有任何数据了的话*/
                                        if((nread = read(文件描述符,buf,sizeof(buf))) > 0) {/*从硬盘读数据出来到数据缓存*/
                                                if((nwrite = write(conn->fd,buf,nread)) client_status = status_CLOSED;
                                                        continue;
                                                }
                                                if(nwrite remain_data_size = nread - nwrite;
                                                        conn->remain_data = must_malloc(conn->remain_data_size);
                                                        memcpy(conn->remain_data,buf + nwrite, conn->remain_data_size);
                                                }
                                        } else if(nread == 0) {
                                                conn->server_status = status_SERVER_CLOSING;
                                        }
                                } else {
                                        /*写数据缓存里的数据到客户端*/
                                        if((nwrite = write(conn->fd,conn->remain_data,conn->remain_data_size)) client_status = status_CLOSED;
                                                free(conn->remain_data);
                                                continue;
                                        }
                                        if(nwrite remain_data_size) {
                                                conn->remain_data_size = conn->remain_data_size - nwrite;
                                                memcpy(buf, conn->remain_data + nwrite, conn->remain_data_size);
                                                conn->remain_data = must_realloc(conn->remain_data, conn->remain_data_size);
                                                memcpy(conn->remain_data, buf, conn->remain_data_size);
                                        } else {
                                                free(conn->remain_data);
                                                conn->remain_data = NULL;
                                                conn->remain_data_size = 0;
                                        }
                                }
                        } else if(happened_ev.events & EPOLLHUP) {
                                conn = (conn_t) happened_ev.data.ptr;
                                conn->client_status = status_CLOSED;
                        }
                }
        }
}

转自linuxsir.org
作者:lijiuwei
邮箱:lijiuwei0902@gmail.com


本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u2/68803/showart_1419610.html
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP