ysitbook 发表于 2011-04-11 13:35

和大家分享一下tornado 如何实现异步处理

下面的程序只能支持多线程的异步处理方式,扩展性一般,如果,可以改进为进程的方式扩展性会好很多 :)
之所以使用tornado 是因为,tornado 性能比较乐观

Client 端代码>>> import socket
>>> d = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>>> d.connect((HOST,PORT))Server 端代码,需要Linux 服务器#encoding = utf-8
import errno
import functools
import socket
import time
import os
import traceback
import memcache
from tornado import ioloop, iostream
from threading import Thread
from Queue import Queue
from config import *

streamDict = {}
streamRequests = {}

################################# Utils ###############################
def stream_add(stream,count):
    """
    添加 Stream
    """
    streamkey = str(stream)
    streamDict = (stream,0)
    streamRequests = Queue()

def stream_remove(streamkey,stream):
    """
    删除 Stream
    """
    try:
      stream.close()
    except:
      pass
    del streamDict
    del streamRequests

################################# Response ###############################
class ClockResponse(Thread):
    """
    定时处理请求接口
    """

    def __init__ (self):
      """
      初始化
      """
      Thread.__init__(self)
      self.flag = True
      self.count = 0

    def run(self):
      """
      运行线程
      """
      while self.flag:
            #打印 LOG,N次/打印
            ct = self.count % 10
            if ct == 0:
                print 'now connections:%s'%(len(streamDict))
            self.count = ct + 1

            #循环处理请求
            for streamkey,(stream,num) in streamDict.items():
                try:
                  #这里返回Response
                  queue = streamRequests
                  print "queue.get()",queue.get()
                  print "q.qsize()",queue.qsize()
                  stream.write('haha')
                except:
                  streamDict = (stream,num+1)
                  #去死吧.......
                  if num>DEAD_VALUE:
                        stream_remove(streamkey,stream)
            time.sleep(SLEEP_TIME)

    def stop(self):
      self.flag = False

################################# Request ###############################
class SocketRequest(object):
    """
    Socket 请求
    """

    def __init__(self, stream, address):
      self.stream = stream
      self.streamkey = str(stream)
      self.address = address
      #开始读取数据
      self.stream.read_until("\r\n", self.on_request)

    def on_request(self,data):
      """
      接收处理请求
      """
      #将数据加载到 Queue
      print "on_request" , data
      queue = streamRequests
      queue.put(data)
      #继续读取请求
      self.stream.read_until("\r\n", self.on_request)


def connection_ready(sock, fd, events):
    """
    启动 Connection 监听,处理程序
    """
    while True:
      try:
            connection, address = sock.accept()
      except socket.error, e:
            if e not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise
            return
      #non-block
      connection.setblocking(0)
      stream = iostream.IOStream(connection)
      #保存stream
      stream_add(stream,0)
      #接收Socket 请求
      SocketRequest(stream,address)


if __name__ == '__main__':
   
    #pidfile = open(os.path.join(settings.PROJECT_PATH,'tornadoServer.pid'),'w')
    #pidfile.write(str(os.getpid()))
    #pidfile.close()
   
    #接收请求
    clock = ClockResponse()
    clock.start()
   
    #定义Socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) #这里的0 dummy protocol for TCP
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setblocking(0)
    sock.bind(("", SERVER_PORT))
    sock.listen(9999)
   
    #获得ioloop
    io_loop = ioloop.IOLoop.instance()
    callback = functools.partial(connection_ready, sock)
    io_loop.add_handler(sock.fileno(), callback, io_loop.READ)

    #启动服务
    try:
      io_loop.start()
    except KeyboardInterrupt:
      io_loop.stop()
      clock.stop()
      print "exited cleanly"TCP/IP module for linux 2.6 dummy版

1、概述
    英文单词dummy是虚假的意思,dummy版的TCP/IP module就是实现一个虚假的传输层协议,该协议不能完成任何实际的网络通讯。它通过module自身提供的一个经过改造的环回网络设备驱动程序mylo,展示网络数据在TCP/IP协议栈中发送和接收的整个流程。其意义在于为后续的实际的STREAM, DGRAM, RAW的开发提供一个基本的程序框架,以及初步验证这个框架的可行性,同时也帮助更好的理解2.6内核中整个TCP/IP协议栈的实现原理。
                                          

2、编译与安装
    dummy版的module与第一版相比较,在模块划分和Makefile上作了很多改进。当前初步按TCP/IP协议栈的层次结构分为五个模块,下面是整个源代码目录树的基本情况:
    ipv4                      顶层目录。
   |
   |--inet                  inet域,主要完成inet域最顶层的一些基本操作。
   |
   |--application         应用层,现在主要为一个dummy协议的测试应用程序。
   |
   |--transport         传输层,目前只有一个dummy协议。
   |
   |--network             网络层。
   |
   |--datalink            数据链路层,目前只实现了环回网络设备驱动程序mylo。
   |
   |--core                  核心模块,连接数据链路层和网络层,传递它们之间的数据。
   |
   |--include            头文件
   |
   |--load.sh unload.sh   装载和卸载脚本
   |
   |--Makfile env.mk          makefile文件。

olivetree123 发表于 2014-12-16 16:07

这么好的东西为什么没人顶呢,虽然我看不懂,但是我还是要顶一下!
页: [1]
查看完整版本: 和大家分享一下tornado 如何实现异步处理