multiprocessing.Process 产生的子进程如何正常退出?
下面是一段示意代码 ttt.py:#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing
import time
import os
class test(multiprocessing.Process):
def run(self):
print 'sub process : %d,: %d -- sleep 5 --' %(os.getppid(),os.getpid())
time.sleep(5)
print 'sub process : %d,: %d -- done --' % (os.getppid(),os.getpid())
b = 0
while True:
print '-- : %d main process starting --' %os.getpid()
while b < 10 :
p = test()
p.start()
b +=1
print '-- : %d main process sleep 10 --' %os.getpid()
time.sleep(100)运行后:# python ttt.py
-- : 23695 main process starting --
sub process : 23695,: 23696 -- sleep 5 --
sub process : 23695,: 23697 -- sleep 5 --
sub process : 23695,: 23698 -- sleep 5 --
sub process : 23695,: 23699 -- sleep 5 --
sub process : 23695,: 23700 -- sleep 5 --
sub process : 23695,: 23702 -- sleep 5 --
sub process : 23695,: 23701 -- sleep 5 --
sub process : 23695,: 23703 -- sleep 5 --
sub process : 23695,: 23704 -- sleep 5 --
-- : 23695 main process sleep 10 --
sub process : 23695,: 23705 -- sleep 5 --
sub process : 23695,: 23697 -- done --
sub process : 23695,: 23696 -- done --
sub process : 23695,: 23698 -- done --
sub process : 23695,: 23699 -- done --
sub process : 23695,: 23700 -- done --
sub process : 23695,: 23701 -- done --
sub process : 23695,: 23704 -- done --
sub process : 23695,: 23703 -- done --
sub process : 23695,: 23702 -- done --
sub process : 23695,: 23705 -- done --产生了10个子进程,完成之后 ,我 ps -ef | grep python, 得到:# ps -ef | grep ython
root 2369587910 00:19 pts/4 00:00:00 python ttt.py
root 23696 236950 00:19 pts/4 00:00:00 <defunct>
root 23697 236950 00:19 pts/4 00:00:00 <defunct>
root 23698 236950 00:19 pts/4 00:00:00 <defunct>
root 23699 236950 00:19 pts/4 00:00:00 <defunct>
root 23700 236950 00:19 pts/4 00:00:00 <defunct>
root 23701 236950 00:19 pts/4 00:00:00 <defunct>
root 23702 236950 00:19 pts/4 00:00:00 <defunct>
root 23703 236950 00:19 pts/4 00:00:00 <defunct>
root 23704 236950 00:19 pts/4 00:00:00 <defunct>
root 23705 236950 00:19 pts/4 00:00:00 <defunct>
#
子进程都是僵尸进程,怎么会这样呢? 如何正确退出子进程? python 新手,求解答! :oops:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing
import time
import os
class test(multiprocessing.Process):
def run(self):
print 'sub process : %d,: %d -- sleep 5 --' %(os.getppid(),os.getpid())
time.sleep(5)
print 'sub process : %d,: %d -- done --' % (os.getppid(),os.getpid())
b = 0
while True:
print '-- : %d main process starting --' %os.getpid()
p={}
for i in range(10) :
p = test()
p.start()
p.join()
print '-- : %d main process sleep 10 --' %os.getpid()
time.sleep(100) start()
Start the process’s activity.
This must be called at most once per process object. It arranges for the object’s run() method to be invoked in a separate process.
join()
If the optional argument timeout is None (the default), the method blocks until the process whose join() method is called terminates. If timeout is a positive number, it blocks at most timeout seconds.
A process can be joined many times.
A process cannot join itself because this would cause a deadlock. It is an error to attempt to join a process before it has been started.
看文档,start下面就是join,回收子进程,和C系统调用wait/waitpid功能类似。 回复 2# bikong0411
这样的话是执行完一个,退出一个。 有没有办法,同时运行 10 个子进程呢 ? 回复 3# timespace
我的意图是:
while Ture:
data = "从数据库里获取数据"
for i in data:
#然后
p = class(i)
p.start()
p.join() 如果这里使用了这个,就停在这里了,得for循环里的子进程全部完成后while循环才能继续走。 我是希望产生子进程后,子进程在运行,但又不影响while循环继续。
但是这里不使用join。 子进程结束后就变成僵尸进程了,如我主题所述。
不知道咋搞了,能指点一二么?
其实,我的需求是:
1.不断从数据库获取数据,一次性可能是获取5条数据
2. 每条数据相当于一个任务,一个任务产生一个子进程去处理它,获取的任务立即执行,如立即执行5个任务
3. 主程序不等待任务(子进程)执行完,继续从数据库中获取数据(任务),如果获取到了,又立刻执行,如此反复
PS: 无需担心有多少条数据(任务)在运行!
---------------
但是,之前主程序衍生出的子进程,我现在不知道如何正确退出,不正确退出,就会变成僵尸进程了。
比如,以下的一个例子:#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing
import time
import os
class test(multiprocessing.Process):
def run(self):
print 'sub process : %d,: %d -- sleep 5 --' %(os.getppid(),os.getpid())
time.sleep(5)
print 'sub process : %d,: %d -- done --' % (os.getppid(),os.getpid())
while True:
i = 0
p = {}
print '-- : %d main process starting --' %os.getpid()
while i < 10 :
p = test()
p.start()
i+=1
print '-- : %d main process sleep 100 --' %os.getpid()
#sleep 100
time.sleep(100)我执行后,是可以立即执行10个子进程的,而主进程会等待 100 秒 ,5秒钟后子进程都执行完了 ,主进程还在 sleep .
此时,我去 ps -ef 查询的时候,这些子进程就变成了僵尸进程拉。因为我没有正确退出子进程。# ps -ef | grep ython
root 1406 149910 20:24 pts/1 00:00:00 python ttt.py
root 140714060 20:24 pts/1 00:00:00 <defunct>
root 140814060 20:24 pts/1 00:00:00 <defunct>
root 140914060 20:24 pts/1 00:00:00 <defunct>
root 141014060 20:24 pts/1 00:00:00 <defunct>
root 141114060 20:24 pts/1 00:00:00 <defunct>
root 141214060 20:24 pts/1 00:00:00 <defunct>
root 141314060 20:24 pts/1 00:00:00 <defunct>
root 141414060 20:24 pts/1 00:00:00 <defunct>
root 141514060 20:24 pts/1 00:00:00 <defunct>
root 141614060 20:24 pts/1 00:00:00 <defunct>
# 两位大神, 我这个需求如何去做呢? 求指点! @bikong0411 @timespace 回复 6# reyleon
这种需求用进程池或线程池更简单http://docs.python.org/2.7/library/multiprocessing.html#module-multiprocessing.pool,无需关心进程管理的问题。
回复 7# timespace
主程序不等待任务(子进程)执行完,继续从数据库中获取数据(任务)! 能否帮我写个简单的示例? 回复 8# reyleon
如果你很确定,每条数据新建一个进程没有问题,比如进程数限制或服务器负载,那么也有一种办法。
基本思路:
1. 主进程管理子进程的创建和销毁
2. proxy进程负责数据IO操作
3. worker进程处理具体数据
下面是一个可正常运行的框架:# -*- coding: utf-8 -*-
import multiprocessing as mp
import os
import random
from signal import signal, SIGINT, SIG_IGN, siginterrupt
import time
def data_source():
"""数据源。
随机选择一个浮点数,作为worker进程的sleep时间,
具体实践时可以将这部分实现改为读取数据库。
"""
dataset =
while True:
time.sleep(0.2)
yield random.choice(dataset)
def proc_proxy(cntl_q, data_q, exit_flag):
"""从数据源读取数据。
先通过cntl_q通知主进程,
再将数据通过data_q发给worker。
"""
for item in data_source():
cntl_q.put({'event': 'data'})
data_q.put(item)
if exit_flag.is_set():
cntl_q.put({'event': 'exit', 'pid': os.getpid()})
break
def proc_worker(cntl_q, data_q):
"""处理数据。
从data_q获取数据,处理完毕后通过cntl_q通知主进程,
然后退出。
"""
item = data_q.get()
time.sleep(item)
cntl_q.put({'event': 'exit', 'pid': os.getpid()})
def main():
proc_pool = {} # 记录创建的所有子进程
cntl_q = mp.Queue() # 控制信息传递队列
data_q = mp.Queue() # 具体数据传递队列
exit_flag = mp.Event() # 退出标记,初始值为False
# 收到SIGINT,通知proxy停止读取数据
signal(SIGINT, lambda x, y: exit_flag.set())
siginterrupt(SIGINT, False)
# 启动proxy进程,后续按需启动woker进程
print 'main {} started'.format(os.getpid())
proc = mp.Process(target=proc_proxy, args=(cntl_q, data_q, exit_flag))
proc.start()
proc_pool = proc
print 'proxy {} started'.format(proc.pid)
while True:
item = cntl_q.get()
if item['event'] == 'data':
proc = mp.Process(target=proc_worker, args=(cntl_q, data_q))
proc.start()
proc_pool = proc
print 'worker {} started'.format(proc.pid)
elif item['event'] == 'exit':
proc = proc_pool.pop(item['pid'])
proc.join()
print 'child {} stopped'.format(item['pid'])
else:
print 'It\'s impossible !'
if not proc_pool: # 所有子进程均已退出
break
print 'main {} stopped'.format(os.getpid())
if __name__ == '__main__':
main() 回复 9# timespace
太感谢啦,经过实际测试,完美符合我的需求,而且加入了信号控制,很完善!! :handshake灰常感谢!
关于你说的进程控制,我是在数据库层面,查询任务的时候做好了控制的,最多不会超过查询的任务数,所以不用担心。
页:
[1]
2