免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 2664 | 回复: 0
打印 上一主题 下一主题

用Python实现一个多线程网页下载器 [复制链接]

论坛徽章:
4
CU大牛徽章
日期:2013-03-13 15:29:07CU大牛徽章
日期:2013-03-13 15:29:49CU大牛徽章
日期:2013-03-13 15:30:192015年迎新春徽章
日期:2015-03-04 09:57:09
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2009-04-29 11:17 |只看该作者 |倒序浏览

                这是一个有着真实需求的实现,我的用途是拿它来通过 HTTP 方式向服务器提交游戏数据。把它放上来也是想大家帮忙挑刺,找找 bug,让它工作得更好。
# -*- coding:utf-8 -*-
import urllib, httplib
import thread
import time
from Queue import Queue, Empty, Full
HEADERS = {"Content-type": "application/x-www-form-urlencoded",
                        'Accept-Language':'zh-cn',
                        'User-Agent': 'Mozilla/4.0 (compatible; MSIE 6.0;Windows NT 5.0)',
                        "Accept": "text/plain"}
UNEXPECTED_ERROR = -1
POST = 'POST'
GET = 'GET'
def base_log(msg):
    print msg
def base_fail_op(task, status, log):
    log('fail op. task = %s, status = %d'%(str(task), status))
def get_remote_data(tasks, results, fail_op = base_fail_op, log = base_log):
    while True:
        task = tasks.get()
        try:
            tid = task['id']
            hpt = task['conn_args'] # hpt = host:port, timeout
        except KeyError, e:
            log(str(e))
            continue
        log('thread_%s doing task %d'%(thread.get_ident(), tid))
        #log('hpt = ' + str(hpt))
        conn = httplib.HTTPConnection(**hpt)
            
        try:
            params = task['params']
        except KeyError, e:
            params = {}
        params = urllib.urlencode(params)
        #log('params = ' + params)
        
        try:
            method = task['method']
        except KeyError:
            method = 'GET'
        #log('method = ' + method)
        
        try:
            url = task['url']
        except KeyError:
            url = '/'
        #log('url = ' + url)
        
        headers = HEADERS
        try:
            tmp = task['headers']
        except KeyError, e:
            tmp = {}
        headers.update(tmp)
        #log('headers = ' + str(headers))
        headers['Content-Length'] = len(params)
        
        try:
            if method == POST:
                conn.request(method, url, params, headers)
            else:
                conn.request(method, url + params)
            response = conn.getresponse()
        except Exception, e:
            log('request failed. method = %s, url = %s, params = %s headers = %s'%(
                        method, url, params, headers))
            log(str(e))
            fail_op(task, UNEXPECTED_ERROR, log)
            continue
            
        if response.status != httplib.OK:
            fail_op(task, response.status, log)
            continue
            
        data = response.read()
        results.put((tid, data), True)
        
class HttpPool(object):
    def __init__(self, threads_count, fail_op, log):
        self._tasks = Queue()
        self._results = Queue()
        
        for i in xrange(threads_count):
            thread.start_new_thread(get_remote_data,
                                                            (self._tasks, self._results, fail_op, log))
            
    def add_task(self, tid, host, url, params, headers = {}, method = 'GET', timeout = None):
        task = {
            'id' : tid,
            'conn_args' : {'host' : host} if timeout is None else {'host' : host, 'timeout' : timeout},
            'headers' : headers,
            'url' : url,
            'params' : params,
            'method' : method,
            }
        try:
            self._tasks.put_nowait(task)
        except Full:
            return False
        return True
        
    def get_results(self):
        results = []
        while True:
            try:
                res = self._results.get_nowait()
            except Empty:
                break
            results.append(res)
        return results
        
def test_google(task_count, threads_count):
    hp = HttpPool(threads_count, base_fail_op, base_log)
    for i in xrange(task_count):
        if hp.add_task(i,
                'www.google.cn',
                '/search?',
                {'q' : 'lai'},
#                method = 'POST'
                ):
            print 'add task successed.'
            
    while True:
        results = hp.get_results()
        if not results:
            time.sleep(1.0 * random.random())
        for i in results:
            print i[0], len(i[1])
#            print unicode(i[1], 'gb18030')
            
if __name__ == '__main__':
    import sys, random
    task_count, threads_count = int(sys.argv[1]), int(sys.argv[2])
    test_google(task_count, threads_count)
有兴趣想尝试运行的朋友,可以把它保存为 xxxx.py,然后执行 python xxxx.py 10 4,其中 10 表示向 google.cn 请求 10 次查询,4 表示由 4 条线程来执行这些任务。
               
               
               

本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u2/74256/showart_1913955.html
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP