墨迹哥 发表于 2014-04-24 10:31

python threading线程控制请教下。附代码!

本帖最后由 墨迹哥 于 2014-04-24 10:32 编辑

千万别骂我造车轮,我是在学习,没啥目的。
运行的时候只能创建10个线程,这有点揪心。感觉不知道是哪错了。。- - #
所以来一次伸手党。。#coding:utf-8
import threading
import Queue
import time

class PoolClass(object):
        '''
                1、建立空的队列
                2、设置可并行的线程数量
        '''
        def __init__(self,thread_sum=10):
                self.queue=Queue.Queue()
                self.thread_sum=thread_sum
                self.thread_list=[]

        '''
                将任务添加到队列中
        '''
        def add_pool(self,task):
                print "把数据放入队列: %d" % task
                self.queue.put(task)

        '''
                取出队列任务并且启动线程
        '''
        def exec_pool(self,func):
                while True:
                        # 判断队列是否为空,如果是空得则结束
                        if self.queue.empty():
                                if len(self.thread_list)==0:
                                print "没有任务了"
                                break

                        # 判断线程是否已满,如果满了,等待结束
                        if len(self.thread_list)>=self.thread_sum:
                                print "判断线程是否满了"
                                for item in self.thread_list:
                                        if item.isAlive():
                                                item.join()
                        # 创建线程使用
                        for i in range(self.thread_sum):
                                print "创建线程"
                                task=self.queue.get()
                                self.thread_list.append(threading.Thread(target=func,args=(task,)))

                        # 启用线程
                        for i in self.thread_list:
                                i.start()

def jobtest(n):
        print n
        print "testjob"
        time.sleep(1)

if __name__=='__main__':
        t=PoolClass()
        for i in range(100):
                t.add_pool(i)
        t.exec_pool(jobtest)

timespace 发表于 2014-04-24 11:32

t=PoolClass()

def __init__(self,thread_sum=10):

这不就是10个吗?说清楚点究竟想要什么结果

墨迹哥 发表于 2014-04-24 12:01

回复 2# timespace


    我想要实现的是最高运行10个线程同时跑,然后没啥。。
    貌似越写越复杂了。。。。#coding:utf-8
import threading
import Queue
import time

class PoolClass(object):
        '''
                1、建立空的队列
                2、设置可并行的线程数量
        '''
        def __init__(self,thread_sum=10):
                self.queue=Queue.Queue()
                self.thread_sum=thread_sum
                self.threads=[]

        '''
                将任务添加到队列中
        '''
        def add_pool(self,task):
                print "把数据放入队列: %d" % task
                self.queue.put(task)

        '''
                取出队列任务并且启动线程
        '''
        def exec_pool(self,func):
                while True:
                        # 判断队列是否为空,如果是空得则结束
                        if self.queue.empty():
                                if len(self.threads)==0:
                                        print "Finish Work!"
                                        break
                                else:
                                        for item in self.threads:
                                                if item.isAlive():
                                                        item.join()
                                                        self.threads.remove(item)
                        else:
                                # 创建线程使用
                                for i in range(self.thread_sum):
                                        print "创建线程"
                                        task=self.queue.get()
                                        self.threads.append(threading.Thread(target=func,args=(task,)))
                               
                                # 启用线程
                                for item in self.threads:
                                        item.start()

                                # join线程
                                for item in self.threads:
                                        if item.isAlive():
                                                item.join()
                                                self.threads.remove(item)

def jobtest(n):
        print n
        print "testjob"
        time.sleep(1)

if __name__=='__main__':
        t=PoolClass()
        for i in range(20):
                t.add_pool(i)
        t.exec_pool(jobtest)

timespace 发表于 2014-04-24 12:06

回复 3# 墨迹哥
了解。不得不说代码太乱了:em16:
处理逻辑/控制逻辑/数据结构都混在一起了,想不出错都不行。。。


   

墨迹哥 发表于 2014-04-24 12:21

回复 4# timespace


    咋修改啊?。。

墨迹哥 发表于 2014-04-24 13:06

执行成功了。。但是JOIN不了。。。。#coding:utf-8
import threading
import Queue
import time

class PoolClass(object):

        '''
                1、建立空的队列
                2、设置可并行的线程数量
        '''
        def __init__(self,thread_sum=2):
                self.queue=Queue.Queue()
                self.thread_sum=thread_sum
                self.threads_list=[]

        '''
                将任务添加到队列中
        '''
        def add_pool(self,task):
                print "把数据放入队列: %d" % task
                self.queue.put(task)

        '''
                取出队列任务并且启动线程
        '''
        def exec_pool(self,func):
                while True:
                        # print "线程列表数量: %d" % len(self.threads)
                        # 判断队列是否为空,如果是空得则结束
                       
                        if self.queue.empty():
                                if len(self.threads_list)==0:
                                        print "Finish Work!"
                                        break
                        else:
                                print "创建线程"
                                # 创建线程使用
                                for i in range(self.thread_sum):
                                        task=self.queue.get()
                                        thr=threading.Thread(target=func,args=(task,))
                                        self.threads_list.append(thr)
                                        thr.start()

                                self.wait_all()

        def wait_all(self):
                print "join线程"
                for item in self.threads_list:
                        if item.isAlive():
                                item.join()
                                self.threads_list.remove(item)


def jobtest(n):
        print "正在执行任务中:%d" % n

if __name__=='__main__':
        t=PoolClass()
        for i in range(20):
                t.add_pool(i)
        t.exec_pool(jobtest)
        t.wait_all()

timespace 发表于 2014-04-24 14:27

回复 6# 墨迹哥
这是一种写法:# -*- coding: utf-8 -*-
import argparse
import logging
import threading
import time
import Queue

def run_job(no, wait_time):
    name = threading.current_thread().name
    logging.info("thread #{}, job #{}".format(name, no))
    time.sleep(wait_time)

class ThreadPool(object):
    """固定尺寸的线程池"""

    def __init__(self, size):
      self._queue = Queue.Queue()
      self._data_ready = threading.Condition()
      self._exit_flag = threading.Event()
      self._threads = []
      for i in range(size):
            t = threading.Thread(target=self._run, name=str(i))
            t.start()
            self._threads.append(t)

    def add_task(self, callback, *args):
      """添加待调度任务

      callback - 待执行的回调函数
      args - callback的参数列表
      """
      self._queue.put((callback, args))
      with self._data_ready:
            self._data_ready.notify()

    def join(self):
      """等待所有任务执行完成"""
      self._queue.join()
      self._exit_flag.set()
      with self._data_ready:
            self._data_ready.notify_all()
      for t in self._threads:
            t.join()

    def _run(self):
      while True:
            with self._data_ready:
                while self._queue.empty() and not self._exit_flag.is_set():
                  self._data_ready.wait()
                if self._exit_flag.is_set():
                  break
                cb, args = self._queue.get_nowait()

            cb(*args)
            self._queue.task_done()


def main():
    parser = argparse.ArgumentParser(description='Thread Pool')
    parser.add_argument('size', type=int,
                        help='fixed size of thread pool')
    parser.add_argument('loops', type=int,
                        help='count of tasks')
    parser.add_argument('-w', '--wait-time', type=float, default=1.0,
                        help='wait time each task')
    args = parser.parse_args()
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')

    tp = ThreadPool(args.size)
    for i in range(args.loops):
      tp.add_task(run_job, i, args.wait_time)
    tp.join()

if __name__ == '__main__':
    main()运行:$ python2.7 thread.py 20 200 -w 0.1
2014-04-24 14:19:52,556 thread #0, job #0
2014-04-24 14:19:52,556 thread #2, job #1
2014-04-24 14:19:52,556 thread #1, job #2
2014-04-24 14:19:52,556 thread #3, job #3
2014-04-24 14:19:52,556 thread #4, job #4
2014-04-24 14:19:52,556 thread #5, job #5
2014-04-24 14:19:52,556 thread #6, job #6
2014-04-24 14:19:52,557 thread #7, job #7
2014-04-24 14:19:52,557 thread #8, job #8
2014-04-24 14:19:52,557 thread #9, job #9
.......
2014-04-24 14:19:53,477 thread #15, job #192
2014-04-24 14:19:53,477 thread #14, job #194
2014-04-24 14:19:53,477 thread #13, job #195
2014-04-24 14:19:53,477 thread #19, job #196
2014-04-24 14:19:53,477 thread #16, job #197
2014-04-24 14:19:53,477 thread #18, job #198
2014-04-24 14:19:53,477 thread #0, job #199

墨迹哥 发表于 2014-04-24 14:51

回复 7# timespace


    我也琢磨了一种写法,目前执行是正常的,但是在收割上有问题。
    在threads_list上有很多线程都没有成功Join掉。#coding:utf-8
import threading
import Queue
import time

class PoolClass(object):

        '''
                1、建立空的队列
                2、设置可并行的线程数量
        '''
        def __init__(self,thread_sum=5):
                self.queue=Queue.Queue()
                self.thread_sum=thread_sum
                self.threads_list=[]

        '''
                将任务添加到队列中
        '''
        def add_pool(self,task):
                # 把任务放入队列
                self.queue.put(task)

        '''
                取出队列任务并且启动线程
        '''
        def exec_pool(self,func):
                while True:
                        print len(self.threads_list)
                        if self.queue.empty():
                                print "Finish Work!"
                                break
                        else:
                                # 创建线程使用
                                for i in range(self.thread_sum):
                                        task=self.queue.get()
                                        thr=threading.Thread(target=func,args=(task,))
                                        self.threads_list.append(thr)
                                        thr.start()

                                for item in self.threads_list:
                                        if item.isAlive():
                                                item.join()
                                                self.threads_list.remove(item)

def jobtest(n):
        print "正在执行任务中:%d" % n
        time.sleep(1)
        print "程序执行完了,完全结束!"


if __name__=='__main__':
        t=PoolClass(10)
        for i in range(100):
                t.add_pool(i)
        t.exec_pool(jobtest)
页: [1]
查看完整版本: python threading线程控制请教下。附代码!