reyleon 发表于 2014-03-29 00:21

multiprocessing.Process 产生的子进程如何正常退出?


下面是一段示意代码 ttt.py:#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing
import time
import os

class test(multiprocessing.Process):
      def run(self):
                print 'sub process : %d,: %d -- sleep 5 --' %(os.getppid(),os.getpid())
                time.sleep(5)
                print 'sub process : %d,: %d -- done --' % (os.getppid(),os.getpid())

b = 0
while True:
      print '-- : %d main process starting --' %os.getpid()
      while b < 10 :
                p = test()
                p.start()
                b +=1
      print '-- : %d main process sleep 10 --' %os.getpid()
      time.sleep(100)运行后:# python ttt.py
-- : 23695 main process starting --
sub process : 23695,: 23696 -- sleep 5 --
sub process : 23695,: 23697 -- sleep 5 --
sub process : 23695,: 23698 -- sleep 5 --
sub process : 23695,: 23699 -- sleep 5 --
sub process : 23695,: 23700 -- sleep 5 --
sub process : 23695,: 23702 -- sleep 5 --
sub process : 23695,: 23701 -- sleep 5 --
sub process : 23695,: 23703 -- sleep 5 --
sub process : 23695,: 23704 -- sleep 5 --
-- : 23695 main process sleep 10 --
sub process : 23695,: 23705 -- sleep 5 --
sub process : 23695,: 23697 -- done --
sub process : 23695,: 23696 -- done --
sub process : 23695,: 23698 -- done --
sub process : 23695,: 23699 -- done --
sub process : 23695,: 23700 -- done --
sub process : 23695,: 23701 -- done --
sub process : 23695,: 23704 -- done --
sub process : 23695,: 23703 -- done --
sub process : 23695,: 23702 -- done --
sub process : 23695,: 23705 -- done --产生了10个子进程,完成之后 ,我 ps -ef | grep python, 得到:# ps -ef | grep ython
root   2369587910 00:19 pts/4    00:00:00 python ttt.py
root   23696 236950 00:19 pts/4    00:00:00 <defunct>
root   23697 236950 00:19 pts/4    00:00:00 <defunct>
root   23698 236950 00:19 pts/4    00:00:00 <defunct>
root   23699 236950 00:19 pts/4    00:00:00 <defunct>
root   23700 236950 00:19 pts/4    00:00:00 <defunct>
root   23701 236950 00:19 pts/4    00:00:00 <defunct>
root   23702 236950 00:19 pts/4    00:00:00 <defunct>
root   23703 236950 00:19 pts/4    00:00:00 <defunct>
root   23704 236950 00:19 pts/4    00:00:00 <defunct>
root   23705 236950 00:19 pts/4    00:00:00 <defunct>
#
子进程都是僵尸进程,怎么会这样呢? 如何正确退出子进程? python 新手,求解答! :oops:








bikong0411 发表于 2014-03-29 10:23

#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing
import time
import os

class test(multiprocessing.Process):
      def run(self):
                print 'sub process : %d,: %d -- sleep 5 --' %(os.getppid(),os.getpid())
                time.sleep(5)
                print 'sub process : %d,: %d -- done --' % (os.getppid(),os.getpid())

b = 0
while True:
      print '-- : %d main process starting --' %os.getpid()
      p={}
      for i in range(10) :
            p = test()
            p.start()
            p.join()
      print '-- : %d main process sleep 10 --' %os.getpid()
      time.sleep(100)

timespace 发表于 2014-03-29 11:08

start()
Start the process’s activity.

This must be called at most once per process object. It arranges for the object’s run() method to be invoked in a separate process.

join()
If the optional argument timeout is None (the default), the method blocks until the process whose join() method is called terminates. If timeout is a positive number, it blocks at most timeout seconds.

A process can be joined many times.

A process cannot join itself because this would cause a deadlock. It is an error to attempt to join a process before it has been started.

看文档,start下面就是join,回收子进程,和C系统调用wait/waitpid功能类似。

reyleon 发表于 2014-03-29 19:13

回复 2# bikong0411


    这样的话是执行完一个,退出一个。 有没有办法,同时运行 10 个子进程呢 ?

reyleon 发表于 2014-03-29 19:36

回复 3# timespace


    我的意图是:

while Ture:
   data = "从数据库里获取数据"
   for i in data:
      #然后
      p = class(i)
      p.start()
      p.join() 如果这里使用了这个,就停在这里了,得for循环里的子进程全部完成后while循环才能继续走。 我是希望产生子进程后,子进程在运行,但又不影响while循环继续。

但是这里不使用join。 子进程结束后就变成僵尸进程了,如我主题所述。

不知道咋搞了,能指点一二么?


reyleon 发表于 2014-03-29 20:26

其实,我的需求是:

1.不断从数据库获取数据,一次性可能是获取5条数据
2. 每条数据相当于一个任务,一个任务产生一个子进程去处理它,获取的任务立即执行,如立即执行5个任务
3. 主程序不等待任务(子进程)执行完,继续从数据库中获取数据(任务),如果获取到了,又立刻执行,如此反复

PS: 无需担心有多少条数据(任务)在运行!

---------------

但是,之前主程序衍生出的子进程,我现在不知道如何正确退出,不正确退出,就会变成僵尸进程了。

比如,以下的一个例子:#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing
import time
import os

class test(multiprocessing.Process):
      def run(self):
                print 'sub process : %d,: %d -- sleep 5 --' %(os.getppid(),os.getpid())
                time.sleep(5)
                print 'sub process : %d,: %d -- done --' % (os.getppid(),os.getpid())

while True:
      i = 0
      p = {}
      print '-- : %d main process starting --' %os.getpid()
      while i < 10 :
                p = test()
                p.start()
                i+=1
      print '-- : %d main process sleep 100 --' %os.getpid()
      #sleep 100
      time.sleep(100)我执行后,是可以立即执行10个子进程的,而主进程会等待 100 秒 ,5秒钟后子进程都执行完了 ,主进程还在 sleep .
此时,我去 ps -ef 查询的时候,这些子进程就变成了僵尸进程拉。因为我没有正确退出子进程。# ps -ef | grep ython
root      1406 149910 20:24 pts/1    00:00:00 python ttt.py
root      140714060 20:24 pts/1    00:00:00 <defunct>
root      140814060 20:24 pts/1    00:00:00 <defunct>
root      140914060 20:24 pts/1    00:00:00 <defunct>
root      141014060 20:24 pts/1    00:00:00 <defunct>
root      141114060 20:24 pts/1    00:00:00 <defunct>
root      141214060 20:24 pts/1    00:00:00 <defunct>
root      141314060 20:24 pts/1    00:00:00 <defunct>
root      141414060 20:24 pts/1    00:00:00 <defunct>
root      141514060 20:24 pts/1    00:00:00 <defunct>
root      141614060 20:24 pts/1    00:00:00 <defunct>
# 两位大神, 我这个需求如何去做呢? 求指点! @bikong0411 @timespace

timespace 发表于 2014-03-29 21:17

回复 6# reyleon
这种需求用进程池或线程池更简单http://docs.python.org/2.7/library/multiprocessing.html#module-multiprocessing.pool,无需关心进程管理的问题。


   

reyleon 发表于 2014-03-29 22:19

回复 7# timespace


主程序不等待任务(子进程)执行完,继续从数据库中获取数据(任务)!    能否帮我写个简单的示例?

timespace 发表于 2014-03-30 13:42

回复 8# reyleon
如果你很确定,每条数据新建一个进程没有问题,比如进程数限制或服务器负载,那么也有一种办法。
基本思路:
1. 主进程管理子进程的创建和销毁
2. proxy进程负责数据IO操作
3. worker进程处理具体数据

下面是一个可正常运行的框架:# -*- coding: utf-8 -*-
import multiprocessing as mp
import os
import random
from signal import signal, SIGINT, SIG_IGN, siginterrupt
import time

def data_source():
    """数据源。

    随机选择一个浮点数,作为worker进程的sleep时间,
    具体实践时可以将这部分实现改为读取数据库。
    """
    dataset =
    while True:
      time.sleep(0.2)
      yield random.choice(dataset)

def proc_proxy(cntl_q, data_q, exit_flag):
    """从数据源读取数据。

    先通过cntl_q通知主进程,
    再将数据通过data_q发给worker。
    """
    for item in data_source():
      cntl_q.put({'event': 'data'})
      data_q.put(item)
      if exit_flag.is_set():
            cntl_q.put({'event': 'exit', 'pid': os.getpid()})
            break


def proc_worker(cntl_q, data_q):
    """处理数据。

    从data_q获取数据,处理完毕后通过cntl_q通知主进程,
    然后退出。
    """
    item = data_q.get()
    time.sleep(item)
    cntl_q.put({'event': 'exit', 'pid': os.getpid()})

def main():
    proc_pool = {} # 记录创建的所有子进程
    cntl_q = mp.Queue() # 控制信息传递队列
    data_q = mp.Queue() # 具体数据传递队列
    exit_flag = mp.Event() # 退出标记,初始值为False

    # 收到SIGINT,通知proxy停止读取数据
    signal(SIGINT, lambda x, y: exit_flag.set())
    siginterrupt(SIGINT, False)

    # 启动proxy进程,后续按需启动woker进程
    print 'main {} started'.format(os.getpid())
    proc = mp.Process(target=proc_proxy, args=(cntl_q, data_q, exit_flag))
    proc.start()
    proc_pool = proc
    print 'proxy {} started'.format(proc.pid)

    while True:
      item = cntl_q.get()
      if item['event'] == 'data':
            proc = mp.Process(target=proc_worker, args=(cntl_q, data_q))
            proc.start()
            proc_pool = proc
            print 'worker {} started'.format(proc.pid)
      elif item['event'] == 'exit':
            proc = proc_pool.pop(item['pid'])
            proc.join()
            print 'child {} stopped'.format(item['pid'])
      else:
            print 'It\'s impossible !'

      if not proc_pool: # 所有子进程均已退出
            break

    print 'main {} stopped'.format(os.getpid())

if __name__ == '__main__':
    main()

reyleon 发表于 2014-03-30 16:50

回复 9# timespace


    太感谢啦,经过实际测试,完美符合我的需求,而且加入了信号控制,很完善!! :handshake灰常感谢!

    关于你说的进程控制,我是在数据库层面,查询任务的时候做好了控制的,最多不会超过查询的任务数,所以不用担心。
页: [1] 2
查看完整版本: multiprocessing.Process 产生的子进程如何正常退出?