python threading线程控制请教下。附代码!
本帖最后由 墨迹哥 于 2014-04-24 10:32 编辑千万别骂我造车轮,我是在学习,没啥目的。
运行的时候只能创建10个线程,这有点揪心。感觉不知道是哪错了。。- - #
所以来一次伸手党。。#coding:utf-8
import threading
import Queue
import time
class PoolClass(object):
'''
1、建立空的队列
2、设置可并行的线程数量
'''
def __init__(self,thread_sum=10):
self.queue=Queue.Queue()
self.thread_sum=thread_sum
self.thread_list=[]
'''
将任务添加到队列中
'''
def add_pool(self,task):
print "把数据放入队列: %d" % task
self.queue.put(task)
'''
取出队列任务并且启动线程
'''
def exec_pool(self,func):
while True:
# 判断队列是否为空,如果是空得则结束
if self.queue.empty():
if len(self.thread_list)==0:
print "没有任务了"
break
# 判断线程是否已满,如果满了,等待结束
if len(self.thread_list)>=self.thread_sum:
print "判断线程是否满了"
for item in self.thread_list:
if item.isAlive():
item.join()
# 创建线程使用
for i in range(self.thread_sum):
print "创建线程"
task=self.queue.get()
self.thread_list.append(threading.Thread(target=func,args=(task,)))
# 启用线程
for i in self.thread_list:
i.start()
def jobtest(n):
print n
print "testjob"
time.sleep(1)
if __name__=='__main__':
t=PoolClass()
for i in range(100):
t.add_pool(i)
t.exec_pool(jobtest)
t=PoolClass()
def __init__(self,thread_sum=10):
这不就是10个吗?说清楚点究竟想要什么结果
回复 2# timespace
我想要实现的是最高运行10个线程同时跑,然后没啥。。
貌似越写越复杂了。。。。#coding:utf-8
import threading
import Queue
import time
class PoolClass(object):
'''
1、建立空的队列
2、设置可并行的线程数量
'''
def __init__(self,thread_sum=10):
self.queue=Queue.Queue()
self.thread_sum=thread_sum
self.threads=[]
'''
将任务添加到队列中
'''
def add_pool(self,task):
print "把数据放入队列: %d" % task
self.queue.put(task)
'''
取出队列任务并且启动线程
'''
def exec_pool(self,func):
while True:
# 判断队列是否为空,如果是空得则结束
if self.queue.empty():
if len(self.threads)==0:
print "Finish Work!"
break
else:
for item in self.threads:
if item.isAlive():
item.join()
self.threads.remove(item)
else:
# 创建线程使用
for i in range(self.thread_sum):
print "创建线程"
task=self.queue.get()
self.threads.append(threading.Thread(target=func,args=(task,)))
# 启用线程
for item in self.threads:
item.start()
# join线程
for item in self.threads:
if item.isAlive():
item.join()
self.threads.remove(item)
def jobtest(n):
print n
print "testjob"
time.sleep(1)
if __name__=='__main__':
t=PoolClass()
for i in range(20):
t.add_pool(i)
t.exec_pool(jobtest)
回复 3# 墨迹哥
了解。不得不说代码太乱了:em16:
处理逻辑/控制逻辑/数据结构都混在一起了,想不出错都不行。。。
回复 4# timespace
咋修改啊?。。 执行成功了。。但是JOIN不了。。。。#coding:utf-8
import threading
import Queue
import time
class PoolClass(object):
'''
1、建立空的队列
2、设置可并行的线程数量
'''
def __init__(self,thread_sum=2):
self.queue=Queue.Queue()
self.thread_sum=thread_sum
self.threads_list=[]
'''
将任务添加到队列中
'''
def add_pool(self,task):
print "把数据放入队列: %d" % task
self.queue.put(task)
'''
取出队列任务并且启动线程
'''
def exec_pool(self,func):
while True:
# print "线程列表数量: %d" % len(self.threads)
# 判断队列是否为空,如果是空得则结束
if self.queue.empty():
if len(self.threads_list)==0:
print "Finish Work!"
break
else:
print "创建线程"
# 创建线程使用
for i in range(self.thread_sum):
task=self.queue.get()
thr=threading.Thread(target=func,args=(task,))
self.threads_list.append(thr)
thr.start()
self.wait_all()
def wait_all(self):
print "join线程"
for item in self.threads_list:
if item.isAlive():
item.join()
self.threads_list.remove(item)
def jobtest(n):
print "正在执行任务中:%d" % n
if __name__=='__main__':
t=PoolClass()
for i in range(20):
t.add_pool(i)
t.exec_pool(jobtest)
t.wait_all()
回复 6# 墨迹哥
这是一种写法:# -*- coding: utf-8 -*-
import argparse
import logging
import threading
import time
import Queue
def run_job(no, wait_time):
name = threading.current_thread().name
logging.info("thread #{}, job #{}".format(name, no))
time.sleep(wait_time)
class ThreadPool(object):
"""固定尺寸的线程池"""
def __init__(self, size):
self._queue = Queue.Queue()
self._data_ready = threading.Condition()
self._exit_flag = threading.Event()
self._threads = []
for i in range(size):
t = threading.Thread(target=self._run, name=str(i))
t.start()
self._threads.append(t)
def add_task(self, callback, *args):
"""添加待调度任务
callback - 待执行的回调函数
args - callback的参数列表
"""
self._queue.put((callback, args))
with self._data_ready:
self._data_ready.notify()
def join(self):
"""等待所有任务执行完成"""
self._queue.join()
self._exit_flag.set()
with self._data_ready:
self._data_ready.notify_all()
for t in self._threads:
t.join()
def _run(self):
while True:
with self._data_ready:
while self._queue.empty() and not self._exit_flag.is_set():
self._data_ready.wait()
if self._exit_flag.is_set():
break
cb, args = self._queue.get_nowait()
cb(*args)
self._queue.task_done()
def main():
parser = argparse.ArgumentParser(description='Thread Pool')
parser.add_argument('size', type=int,
help='fixed size of thread pool')
parser.add_argument('loops', type=int,
help='count of tasks')
parser.add_argument('-w', '--wait-time', type=float, default=1.0,
help='wait time each task')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
tp = ThreadPool(args.size)
for i in range(args.loops):
tp.add_task(run_job, i, args.wait_time)
tp.join()
if __name__ == '__main__':
main()运行:$ python2.7 thread.py 20 200 -w 0.1
2014-04-24 14:19:52,556 thread #0, job #0
2014-04-24 14:19:52,556 thread #2, job #1
2014-04-24 14:19:52,556 thread #1, job #2
2014-04-24 14:19:52,556 thread #3, job #3
2014-04-24 14:19:52,556 thread #4, job #4
2014-04-24 14:19:52,556 thread #5, job #5
2014-04-24 14:19:52,556 thread #6, job #6
2014-04-24 14:19:52,557 thread #7, job #7
2014-04-24 14:19:52,557 thread #8, job #8
2014-04-24 14:19:52,557 thread #9, job #9
.......
2014-04-24 14:19:53,477 thread #15, job #192
2014-04-24 14:19:53,477 thread #14, job #194
2014-04-24 14:19:53,477 thread #13, job #195
2014-04-24 14:19:53,477 thread #19, job #196
2014-04-24 14:19:53,477 thread #16, job #197
2014-04-24 14:19:53,477 thread #18, job #198
2014-04-24 14:19:53,477 thread #0, job #199 回复 7# timespace
我也琢磨了一种写法,目前执行是正常的,但是在收割上有问题。
在threads_list上有很多线程都没有成功Join掉。#coding:utf-8
import threading
import Queue
import time
class PoolClass(object):
'''
1、建立空的队列
2、设置可并行的线程数量
'''
def __init__(self,thread_sum=5):
self.queue=Queue.Queue()
self.thread_sum=thread_sum
self.threads_list=[]
'''
将任务添加到队列中
'''
def add_pool(self,task):
# 把任务放入队列
self.queue.put(task)
'''
取出队列任务并且启动线程
'''
def exec_pool(self,func):
while True:
print len(self.threads_list)
if self.queue.empty():
print "Finish Work!"
break
else:
# 创建线程使用
for i in range(self.thread_sum):
task=self.queue.get()
thr=threading.Thread(target=func,args=(task,))
self.threads_list.append(thr)
thr.start()
for item in self.threads_list:
if item.isAlive():
item.join()
self.threads_list.remove(item)
def jobtest(n):
print "正在执行任务中:%d" % n
time.sleep(1)
print "程序执行完了,完全结束!"
if __name__=='__main__':
t=PoolClass(10)
for i in range(100):
t.add_pool(i)
t.exec_pool(jobtest)
页:
[1]