免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 6487 | 回复: 1

和大家分享一下tornado 如何实现异步处理 [复制链接]

论坛徽章:
0
发表于 2011-04-11 13:35 |显示全部楼层
下面的程序只能支持多线程的异步处理方式,扩展性一般,如果,可以改进为进程的方式扩展性会好很多 :)
之所以使用tornado 是因为,tornado 性能比较乐观

Client 端代码
  1. >>> import socket
  2. >>> d = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  3. >>> d.connect((HOST,PORT))
复制代码
Server 端代码,需要Linux 服务器
  1. #encoding = utf-8
  2. import errno
  3. import functools
  4. import socket
  5. import time
  6. import os
  7. import traceback
  8. import memcache
  9. from tornado import ioloop, iostream
  10. from threading import Thread
  11. from Queue import Queue
  12. from config import *

  13. streamDict = {}
  14. streamRequests = {}

  15. ################################# Utils ###############################
  16. def stream_add(stream,count):
  17.     """
  18.     添加 Stream
  19.     """
  20.     streamkey = str(stream)
  21.     streamDict[streamkey] = (stream,0)
  22.     streamRequests[streamkey] = Queue()

  23. def stream_remove(streamkey,stream):
  24.     """
  25.     删除 Stream
  26.     """
  27.     try:
  28.         stream.close()
  29.     except:
  30.         pass
  31.     del streamDict[streamkey]
  32.     del streamRequests[streamkey]

  33. ################################# Response ###############################
  34. class ClockResponse(Thread):
  35.     """
  36.     定时处理请求接口
  37.     """

  38.     def __init__ (self):
  39.         """
  40.         初始化
  41.         """
  42.         Thread.__init__(self)
  43.         self.flag = True
  44.         self.count = 0

  45.     def run(self):
  46.         """
  47.         运行线程
  48.         """
  49.         while self.flag:
  50.             #打印 LOG,N次/打印
  51.             ct = self.count % 10
  52.             if ct == 0:
  53.                 print 'now connections:%s'%(len(streamDict))
  54.             self.count = ct + 1

  55.             #循环处理请求
  56.             for streamkey,(stream,num) in streamDict.items():
  57.                 try:
  58.                     #这里返回Response
  59.                     queue = streamRequests[streamkey]
  60.                     print "queue.get()",queue.get()
  61.                     print "q.qsize()",queue.qsize()
  62.                     stream.write('haha')
  63.                 except:
  64.                     streamDict[streamkey] = (stream,num+1)
  65.                     #去死吧.......
  66.                     if num>DEAD_VALUE:
  67.                         stream_remove(streamkey,stream)
  68.             time.sleep(SLEEP_TIME)

  69.     def stop(self):
  70.         self.flag = False

  71. ################################# Request ###############################
  72. class SocketRequest(object):
  73.     """
  74.     Socket 请求
  75.     """

  76.     def __init__(self, stream, address):
  77.         self.stream = stream
  78.         self.streamkey = str(stream)
  79.         self.address = address
  80.         #开始读取数据
  81.         self.stream.read_until("\r\n", self.on_request)

  82.     def on_request(self,data):
  83.         """
  84.         接收处理请求
  85.         """
  86.         #将数据加载到 Queue
  87.         print "on_request" , data
  88.         queue = streamRequests[self.streamkey]
  89.         queue.put(data)
  90.         #继续读取请求
  91.         self.stream.read_until("\r\n", self.on_request)


  92. def connection_ready(sock, fd, events):
  93.     """
  94.     启动 Connection 监听,处理程序
  95.     """
  96.     while True:
  97.         try:
  98.             connection, address = sock.accept()
  99.         except socket.error, e:
  100.             if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
  101.                 raise
  102.             return
  103.         #non-block
  104.         connection.setblocking(0)
  105.         stream = iostream.IOStream(connection)
  106.         #保存stream
  107.         stream_add(stream,0)
  108.         #接收Socket 请求
  109.         SocketRequest(stream,address)


  110. if __name__ == '__main__':
  111.    
  112.     #pidfile = open(os.path.join(settings.PROJECT_PATH,'tornadoServer.pid'),'w')
  113.     #pidfile.write(str(os.getpid()))
  114.     #pidfile.close()
  115.    
  116.     #接收请求
  117.     clock = ClockResponse()
  118.     clock.start()
  119.    
  120.     #定义Socket
  121.     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) #这里的0 dummy protocol for TCP
  122.     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  123.     sock.setblocking(0)
  124.     sock.bind(("", SERVER_PORT))
  125.     sock.listen(9999)
  126.    
  127.     #获得ioloop
  128.     io_loop = ioloop.IOLoop.instance()
  129.     callback = functools.partial(connection_ready, sock)
  130.     io_loop.add_handler(sock.fileno(), callback, io_loop.READ)

  131.     #启动服务
  132.     try:
  133.         io_loop.start()
  134.     except KeyboardInterrupt:
  135.         io_loop.stop()
  136.         clock.stop()
  137.         print "exited cleanly"
复制代码
TCP/IP module for linux 2.6 dummy版

1、概述
    英文单词dummy是虚假的意思,dummy版的TCP/IP module就是实现一个虚假的传输层协议,该协议不能完成任何实际的网络通讯。它通过module自身提供的一个经过改造的环回网络设备驱动程序mylo,展示网络数据在TCP/IP协议栈中发送和接收的整个流程。其意义在于为后续的实际的STREAM, DGRAM, RAW的开发提供一个基本的程序框架,以及初步验证这个框架的可行性,同时也帮助更好的理解2.6内核中整个TCP/IP协议栈的实现原理。
                                          

2、编译与安装
    dummy版的module与第一版相比较,在模块划分和Makefile上作了很多改进。当前初步按TCP/IP协议栈的层次结构分为五个模块,下面是整个源代码目录树的基本情况:
    ipv4                      顶层目录。
     |
     |--inet                    inet域,主要完成inet域最顶层的一些基本操作。
     |
     |--application         应用层,现在主要为一个dummy协议的测试应用程序。
     |
     |--transport           传输层,目前只有一个dummy协议。
     |
     |--network             网络层。
     |
     |--datalink              数据链路层,目前只实现了环回网络设备驱动程序mylo。
     |
     |--core                  核心模块,连接数据链路层和网络层,传递它们之间的数据。
     |
     |--include              头文件
     |
     |--load.sh unload.sh     装载和卸载脚本
     |
     |--Makfile env.mk          makefile文件。

论坛徽章:
0
发表于 2014-12-16 16:07 |显示全部楼层
这么好的东西为什么没人顶呢,虽然我看不懂,但是我还是要顶一下!
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP