- 论坛徽章:
- 0
|
下面的程序只能支持多线程的异步处理方式,扩展性一般,如果,可以改进为进程的方式扩展性会好很多 :)
之所以使用tornado 是因为,tornado 性能比较乐观
Client 端代码- >>> import socket
- >>> d = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- >>> d.connect((HOST,PORT))
复制代码 Server 端代码,需要Linux 服务器- #encoding = utf-8
- import errno
- import functools
- import socket
- import time
- import os
- import traceback
- import memcache
- from tornado import ioloop, iostream
- from threading import Thread
- from Queue import Queue
- from config import *
- streamDict = {}
- streamRequests = {}
- ################################# Utils ###############################
- def stream_add(stream,count):
- """
- 添加 Stream
- """
- streamkey = str(stream)
- streamDict[streamkey] = (stream,0)
- streamRequests[streamkey] = Queue()
- def stream_remove(streamkey,stream):
- """
- 删除 Stream
- """
- try:
- stream.close()
- except:
- pass
- del streamDict[streamkey]
- del streamRequests[streamkey]
- ################################# Response ###############################
- class ClockResponse(Thread):
- """
- 定时处理请求接口
- """
- def __init__ (self):
- """
- 初始化
- """
- Thread.__init__(self)
- self.flag = True
- self.count = 0
- def run(self):
- """
- 运行线程
- """
- while self.flag:
- #打印 LOG,N次/打印
- ct = self.count % 10
- if ct == 0:
- print 'now connections:%s'%(len(streamDict))
- self.count = ct + 1
- #循环处理请求
- for streamkey,(stream,num) in streamDict.items():
- try:
- #这里返回Response
- queue = streamRequests[streamkey]
- print "queue.get()",queue.get()
- print "q.qsize()",queue.qsize()
- stream.write('haha')
- except:
- streamDict[streamkey] = (stream,num+1)
- #去死吧.......
- if num>DEAD_VALUE:
- stream_remove(streamkey,stream)
- time.sleep(SLEEP_TIME)
- def stop(self):
- self.flag = False
- ################################# Request ###############################
- class SocketRequest(object):
- """
- Socket 请求
- """
- def __init__(self, stream, address):
- self.stream = stream
- self.streamkey = str(stream)
- self.address = address
- #开始读取数据
- self.stream.read_until("\r\n", self.on_request)
- def on_request(self,data):
- """
- 接收处理请求
- """
- #将数据加载到 Queue
- print "on_request" , data
- queue = streamRequests[self.streamkey]
- queue.put(data)
- #继续读取请求
- self.stream.read_until("\r\n", self.on_request)
- def connection_ready(sock, fd, events):
- """
- 启动 Connection 监听,处理程序
- """
- while True:
- try:
- connection, address = sock.accept()
- except socket.error, e:
- if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
- raise
- return
- #non-block
- connection.setblocking(0)
- stream = iostream.IOStream(connection)
- #保存stream
- stream_add(stream,0)
- #接收Socket 请求
- SocketRequest(stream,address)
- if __name__ == '__main__':
-
- #pidfile = open(os.path.join(settings.PROJECT_PATH,'tornadoServer.pid'),'w')
- #pidfile.write(str(os.getpid()))
- #pidfile.close()
-
- #接收请求
- clock = ClockResponse()
- clock.start()
-
- #定义Socket
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) #这里的0 dummy protocol for TCP
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.setblocking(0)
- sock.bind(("", SERVER_PORT))
- sock.listen(9999)
-
- #获得ioloop
- io_loop = ioloop.IOLoop.instance()
- callback = functools.partial(connection_ready, sock)
- io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
- #启动服务
- try:
- io_loop.start()
- except KeyboardInterrupt:
- io_loop.stop()
- clock.stop()
- print "exited cleanly"
复制代码 TCP/IP module for linux 2.6 dummy版
1、概述
英文单词dummy是虚假的意思,dummy版的TCP/IP module就是实现一个虚假的传输层协议,该协议不能完成任何实际的网络通讯。它通过module自身提供的一个经过改造的环回网络设备驱动程序mylo,展示网络数据在TCP/IP协议栈中发送和接收的整个流程。其意义在于为后续的实际的STREAM, DGRAM, RAW的开发提供一个基本的程序框架,以及初步验证这个框架的可行性,同时也帮助更好的理解2.6内核中整个TCP/IP协议栈的实现原理。
2、编译与安装
dummy版的module与第一版相比较,在模块划分和Makefile上作了很多改进。当前初步按TCP/IP协议栈的层次结构分为五个模块,下面是整个源代码目录树的基本情况:
ipv4 顶层目录。
|
|--inet inet域,主要完成inet域最顶层的一些基本操作。
|
|--application 应用层,现在主要为一个dummy协议的测试应用程序。
|
|--transport 传输层,目前只有一个dummy协议。
|
|--network 网络层。
|
|--datalink 数据链路层,目前只实现了环回网络设备驱动程序mylo。
|
|--core 核心模块,连接数据链路层和网络层,传递它们之间的数据。
|
|--include 头文件
|
|--load.sh unload.sh 装载和卸载脚本
|
|--Makfile env.mk makefile文件。 |
|