免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
12下一页
最近访问板块 发新帖
查看: 5803 | 回复: 10
打印 上一主题 下一主题

使用多线程处理1000个url,在linux老是挂掉 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2009-10-22 15:47 |只看该作者 |倒序浏览
如题,不知道哪位可以帮忙看看问题在哪里,谢谢
附件为测试数据

#!/usr/bin/env python
#coding=utf-8
import Queue
import threading
import urllib2
import time
import os
import string
import thread

hosts = ["http://yahoo.com", "http://google.com"]

class rf(object):  
    def read10000(self):
        file_patch = os.path.join(os.path.dirname(__file__),'t1000.txt')
        #以读写模式打开文件
        f = file(file_patch, 'r')
        t10000 = []
        for eachLine in f:
            #判断第一个字符是否是#号,如果是就忽略
            if eachLine[:1] != '#':
                t10000.append('http://' + eachLine.split()[1])
        f.close()
        return t10000
   
p = rf()
p1 = p.read10000()

queue = Queue.Queue()
class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        
    def run(self):
        while True:
            try:
                #grabs host from queue
                host = self.queue.get()
                #grabs urls of hosts and prints first 1024 bytes of page
                url = urllib2.urlopen(host, timeout = 1)
                print "ok %s," % host
                #signals to queue job is done
                self.queue.task_done()
            except (urllib2.URLError,urllib2.socket.timeout), e:
                print "error %s," % host
               
start = time.time()

def main():
    #spawn a pool of threads, and pass them queue instance
    for i in range(len(p1)):
        try:
            t = ThreadUrl(queue)
            t.setDaemon(False)
            t.start()
        except thread.error, e:
            time.sleep(3)
        #populate queue with data
        for host in p1:
            queue.put(host)
            #wait on the queue until everything has been processed
            #queue.join()
main()
print "Elapsed Time: %s" % (time.time() - start)

t1000.rar

8.51 KB, 下载次数: 46

论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
2 [报告]
发表于 2009-10-22 17:24 |只看该作者
1000 个线程,系统默认一个线程使用8M的栈空间,你的内存够吗?

论坛徽章:
0
3 [报告]
发表于 2009-10-22 19:15 |只看该作者
好像最多也就200~300个线程。最好是用线程池吧

论坛徽章:
0
4 [报告]
发表于 2009-10-22 20:11 |只看该作者
学习了。。。

论坛徽章:
0
5 [报告]
发表于 2009-10-23 07:55 |只看该作者
确实内存不够,最后抱memoryerror,后来开了2个线程,但是很慢,不知怎么写线程池 希望高手能给个修改的代码
刚学习Python,对这些还不是很了解

论坛徽章:
0
6 [报告]
发表于 2009-10-23 09:36 |只看该作者
顶起来

论坛徽章:
0
7 [报告]
发表于 2009-10-23 10:45 |只看该作者
线程池实现的代码如下,忘了是从哪个网站下载的了,可能对LZ有帮助!
--------------------------------------------------------------------------------------------------------------
#coding=utf-8
import threading
from time import sleep
try:
    True
except NameError:
    False = 0
    True = not False

class ThreadPool:
    """Flexible thread pool class.  Creates a pool of threads, then
    accepts tasks that will be dispatched to the next available
    thread."""
   
    def __init__(self, numThreads):

        """Initialize the thread pool with numThreads workers."""
        
        self.__threads = []
        self.__resizeLock = threading.Condition(threading.Lock())
        self.__taskLock = threading.Condition(threading.Lock())
        self.__tasks = []
        self.__isJoining = False
        self.setThreadCount(numThreads)

    def setThreadCount(self, newNumThreads):
        """ External method to set the current pool size.  Acquires
        the resizing lock, then calls the internal version to do real
        work."""
        # Can't change the thread count if we're shutting down the pool!
        if self.__isJoining:
            return False
        
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(newNumThreads)
        finally:
            self.__resizeLock.release()
        return True

    def __setThreadCountNolock(self, newNumThreads):
        """Set the current pool size, spawning or terminating threads
        if necessary.  Internal use only; assumes the resizing lock is
        held."""
        # If we need to grow the pool, do so
        while newNumThreads > len(self.__threads):
            newThread = ThreadPoolThread(self)
            self.__threads.append(newThread)
            newThread.start()
        # If we need to shrink the pool, do so
        while newNumThreads < len(self.__threads):
            self.__threads[0].goAway()
            del self.__threads[0]

    def getThreadCount(self):
        """Return the number of threads in the pool."""
        self.__resizeLock.acquire()
        try:
            return len(self.__threads)
        finally:
            self.__resizeLock.release()

    def queueTask(self, task, args=None, taskCallback=None):

        """Insert a task into the queue.  task must be callable;
        args and taskCallback can be None."""
        
        if self.__isJoining == True:
            return False
        if not callable(task):
            return False
        
        self.__taskLock.acquire()
        try:
            self.__tasks.append((task, args, taskCallback))
            return True
        finally:
            self.__taskLock.release()

    def getNextTask(self):

        """ Retrieve the next task from the task queue.  For use
        only by ThreadPoolThread objects contained in the pool."""
        
        self.__taskLock.acquire()
        try:
            if self.__tasks == []:
                return (None, None, None)
            else:
                return self.__tasks.pop(0)
        finally:
            self.__taskLock.release()
   
    def joinAll(self, waitForTasks = True, waitForThreads = True):

        """ Clear the task queue and terminate all pooled threads,
        optionally allowing the tasks and threads to finish."""
        
        # Mark the pool as joining to prevent any more task queueing
        self.__isJoining = True

        # Wait for tasks to finish
        if waitForTasks:
            while self.__tasks != []:
                sleep(.1)

        # Tell all the threads to quit
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(0)
            self.__isJoining = True

            # Wait until all threads have exited
            if waitForThreads:
                for t in self.__threads:
                    t.join()
                    del t

            # Reset the pool for potential reuse
            self.__isJoining = False
        finally:
            self.__resizeLock.release()

class ThreadPoolThread(threading.Thread):

    """ Pooled thread class. """
   
    threadSleepTime = 0.1

    def __init__(self, pool):

        """ Initialize the thread and remember the pool. """
        
        threading.Thread.__init__(self)
        self.__pool = pool
        self.__isDying = False
        
    def run(self):

        """ Until told to quit, retrieve the next task and execute
        it, calling the callback if any.  """
        
        while self.__isDying == False:
            cmd, args, callback = self.__pool.getNextTask()
            # If there's nothing to do, just sleep a bit
            if cmd is None:
                sleep(ThreadPoolThread.threadSleepTime)
            elif callback is None:
                cmd(args)
            else:
                callback(cmd(args))
   
    def goAway(self):

        """ Exit the run loop next time through."""
        
        self.__isDying = True

# Usage example
if __name__ == "__main__":

    from random import randrange

    # Sample task 1: given a start and end value, shuffle integers,
    # then sort them
   
    def sortTask(data):
        print "SortTask starting for ", data
        numbers = range(data[0], data[1])
        for a in numbers:
            rnd = randrange(0, len(numbers) - 1)
            a, numbers[rnd] = numbers[rnd], a
        print "SortTask sorting for ", data
        numbers.sort()
        print "SortTask done for ", data
        return "Sorter ", data

    # Sample task 2: just sleep for a number of seconds.

    def waitTask(data):
        print "WaitTask starting for ", data
        print "WaitTask sleeping for %d seconds" % data
        sleep(data)
        return "Waiter", data

    # Both tasks use the same callback

    def taskCallback(data):
        print "Callback called for", data

    # Create a pool with three worker threads
    print "begin..."
    pool = ThreadPool(3)
    # Insert tasks into the queue and let them run
    pool.queueTask(sortTask, (1000, 100000), taskCallback)
    pool.queueTask(waitTask, 5, taskCallback)
    pool.queueTask(sortTask, (200, 200000), taskCallback)
    pool.queueTask(waitTask, 2, taskCallback)
    pool.queueTask(sortTask, (3, 30000), taskCallback)
    pool.queueTask(waitTask, 7, taskCallback)

    # When all tasks are finished, allow the threads to terminate
    pool.joinAll()

论坛徽章:
0
8 [报告]
发表于 2009-10-23 14:15 |只看该作者
感谢楼上的帮助

最后测试是可行的 但是还是慢,
东找西找,找到一个,通过下面的方法可以加快速度,但是可惜必须把文件拆开执行
1000个url 需要拆成100行一个(测试100行的数据数据平均最快,5个文件只需要41s处理完成)

以下是代码,也是利用的线程池,代码重复厉害,忘高手别p我。

#!/usr/bin/env python
#coding=utf-8

import Queue, threading, sys   
from threading import Thread   
import time,urllib2   
import os

# working thread   
class Worker(Thread):   
    worker_count = 0   
    def __init__( self, workQueue, resultQueue, timeout = 0, **kwds):   
        Thread.__init__( self, **kwds )   
        self.id = Worker.worker_count   
        Worker.worker_count += 1   
        self.setDaemon( True )   
        self.workQueue = workQueue   
        self.resultQueue = resultQueue   
        self.timeout = timeout   

    def run( self ):   
        ''' the get-some-work, do-some-work main loop of worker threads '''   
        while True:   
            try:   
                callable, args, kwds = self.workQueue.get(timeout=self.timeout)   
                res = callable(*args, **kwds)   
                print "worker[%2d]: %s" % (self.id, str(res) )   
                self.resultQueue.put( res )   
            except Queue.Empty:   
                break   
            except :   
                print 'worker[%2d]' % self.id, sys.exc_info()[:2]   
                  
class WorkerManager:   
    def __init__( self, num_of_workers=10, timeout = 1):   
        self.workQueue = Queue.Queue()   
        self.resultQueue = Queue.Queue()   
        self.workers = []   
        self.timeout = timeout   
        self._recruitThreads( num_of_workers )   

    def _recruitThreads( self, num_of_workers ):   
        for i in range( num_of_workers ):   
            worker = Worker( self.workQueue, self.resultQueue, self.timeout )   
            self.workers.append(worker)   

    def start(self):   
        for w in self.workers:   
            w.start()   

    def wait_for_complete( self):   
        # ...then, wait for each of them to terminate:   
        while len(self.workers):   
            worker = self.workers.pop()   
            worker.join( )   
            if worker.isAlive() and not self.workQueue.empty():   
                self.workers.append( worker )   
        print "All jobs are are completed."   

    def add_job( self, callable, *args, **kwds ):   
        self.workQueue.put( (callable, args, kwds) )   

    def get_result( self, *args, **kwds ):   
        return self.resultQueue.get( *args, **kwds )

start = time.time()

def test_job1(id, sleep = 0.001 ):
    file_patch = os.path.join(os.path.dirname(__file__),'t100.txt')
    #以读写模式打开文件
    f = file(file_patch, 'r')
    for eachLine in f:
        #判断第一个字符是否是#号,如果是就忽略
        if eachLine[:1] != '#':
            try:   
                #urllib.urlopen('https://www.gmail.com/').read()
                host = 'http://' + eachLine.split()[1]
                response = urllib2.urlopen(host, timeout = 1)
                print "ok %s," % host
            except:   
                #print '[%4d]' % id, sys.exc_info()[:2]   
                print "error %s," % host
    f.close()
    return id   

def test_job2(id, sleep = 0.001 ):   
    file_patch = os.path.join(os.path.dirname(__file__),'t200.txt')
    #以读写模式打开文件
    f = file(file_patch, 'r')
    for eachLine in f:
        #判断第一个字符是否是#号,如果是就忽略
        if eachLine[:1] != '#':
            try:   
                #urllib.urlopen('https://www.gmail.com/').read()
                host = 'http://' + eachLine.split()[1]
                response = urllib2.urlopen(host, timeout = 1)
                print "ok %s," % host
            except:   
                #print '[%4d]' % id, sys.exc_info()[:2]   
                print "error %s," % host
    f.close()
    return id  

def test_job3(id, sleep = 0.001 ):   
    file_patch = os.path.join(os.path.dirname(__file__),'t300.txt')
    #以读写模式打开文件
    f = file(file_patch, 'r')
    for eachLine in f:
        #判断第一个字符是否是#号,如果是就忽略
        if eachLine[:1] != '#':
            try:   
                #urllib.urlopen('https://www.gmail.com/').read()
                host = 'http://' + eachLine.split()[1]
                response = urllib2.urlopen(host, timeout = 1)
                print "ok %s," % host
            except:   
                #print '[%4d]' % id, sys.exc_info()[:2]   
                print "error %s," % host
    f.close()
    return id  

def test_job4(id, sleep = 0.001 ):   
    file_patch = os.path.join(os.path.dirname(__file__),'t400.txt')
    #以读写模式打开文件
    f = file(file_patch, 'r')
    for eachLine in f:
        #判断第一个字符是否是#号,如果是就忽略
        if eachLine[:1] != '#':
            try:   
                #urllib.urlopen('https://www.gmail.com/').read()
                host = 'http://' + eachLine.split()[1]
                response = urllib2.urlopen(host, timeout = 1)
                print "ok %s," % host
            except:   
                #print '[%4d]' % id, sys.exc_info()[:2]   
                print "error %s," % host
    f.close()
    return id  

def test_job5(id, sleep = 0.001 ):   
    file_patch = os.path.join(os.path.dirname(__file__),'t500.txt')
    #以读写模式打开文件
    f = file(file_patch, 'r')
    for eachLine in f:
        #判断第一个字符是否是#号,如果是就忽略
        if eachLine[:1] != '#':
            try:   
                #urllib.urlopen('https://www.gmail.com/').read()
                host = 'http://' + eachLine.split()[1]
                response = urllib2.urlopen(host, timeout = 1)
                print "ok %s," % host
            except:   
                #print '[%4d]' % id, sys.exc_info()[:2]   
                print "error %s," % host
    f.close()
    return id

def test():   
    import socket   
    socket.setdefaulttimeout(10)   
    print 'start testing'  
    #开启的线程数
    wm = WorkerManager(10)
    #i是执行的次数
    for i in [1]:
            wm.add_job( test_job1, i, i*0.001 )   
            wm.add_job( test_job2, i, i*0.001 )
            wm.add_job( test_job3, i, i*0.001 )
            wm.add_job( test_job4, i, i*0.001 )
            wm.add_job( test_job5, i, i*0.001 )
    wm.start()   
    wm.wait_for_complete()   
    print 'end testing'  

test()
print "Elapsed Time: %s" % (time.time() - start)

============================
我看到网上的好多都是stackless比python自己的线程快,但是我写了一个,还是好慢,估计学习不到家,不知道是否有人可以给我一个这样的例子

论坛徽章:
0
9 [报告]
发表于 2009-10-25 16:49 |只看该作者
你的是怎么处理URL?  只是连接一下吗? 那跟端口扫描程序差不多了,我写了一个异步的框架,里边有一个端口扫描的例子,1000个端口也就两三秒吧,
http://code.google.com/p/netdkit/source/browse/#svn/trunk/pyndk

论坛徽章:
0
10 [报告]
发表于 2009-10-26 08:48 |只看该作者
我的就是利用Python的urllib2函数来捕获url错误 并伴有1个页面打开过程

我试了一下你的框架的例子,结果基本都是返回非200的错误

如果server是squid搭建的 ,你的程序对80的扫描100%返回错误状态
如果server输入java applet应用,那么你的程序也会出问题

并且你的程序对于端口扫描执行必须是
python port_scanner.py www.163.com 80-80 一个端口还必须这样写,呵呵 提点建议 希望你改下

并且你的程序头上最好能加上
#!/usr/bin/env python
#coding=utf-8
这样会通用好多

还有一个我的win是Python2.6的 执行你的port_scanner.py报错
from os import O_NONBLOCK 没有这个属性

还有你的程序最后的报告,我不知道怎么才是正确,是否还要自己去取状态嘛来判断,这个比较含糊

由于我是初学,有不对的还望指正
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP