免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
12下一页
最近访问板块 发新帖
查看: 4637 | 回复: 10
打印 上一主题 下一主题

multiprocessing hang问题 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2015-09-17 14:54 |只看该作者 |倒序浏览
本帖最后由 linewer 于 2015-09-17 14:57 编辑
  1. #!/usr/bin/python

  2. import os, sys, time, atexit, errno
  3. import multiprocessing as mp
  4. from signal import signal
  5. from signal import SIGTERM
  6. from signal import SIGINT



  7. exit_flag = mp.Event()
  8. stdin =  "/dev/null"
  9. stdout = "/dev/null"
  10. #stderr = "/dev/null"
  11. stderr = "/tmp/test.log"
  12. pidfile ="/tmp/test.pid"

  13. def daemonize():
  14.     global stdin, stdout, stderr, pidfile

  15.     try:
  16.         pid = os.fork()
  17.         if pid > 0:
  18.             sys.exit(0)
  19.     except OSError, e:
  20.         sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
  21.         sys.exit(1)

  22.     os.chdir("/")
  23.     os.setsid()
  24.     os.umask(0)

  25.     try:
  26.         pid = os.fork()
  27.         if pid > 0:
  28.             sys.exit(0)
  29.     except OSError, e:
  30.         sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
  31.         sys.exit(1)

  32.     sys.stdout.flush()
  33.     sys.stderr.flush()
  34.     si = file(stdin, 'r')
  35.     so = file(stdout, 'a+')
  36.     se = file(stderr, 'a+', 0)
  37.     os.dup2(si.fileno(), sys.stdin.fileno())
  38.     os.dup2(so.fileno(), sys.stdout.fileno())
  39.     os.dup2(se.fileno(), sys.stderr.fileno())

  40.     atexit.register(delpid)
  41.     pid = str(os.getpid())
  42.     file(pidfile, 'w+').write("%s\n" % pid)

  43. def delpid():
  44.     os.remove(pidfile)

  45. def handle_signal():
  46.     signal(SIGINT, handler)
  47.     signal(SIGTERM, handler)

  48. def handler(signum, frame):
  49.     global exit_flag
  50.     sys.stderr.write("get a signal: %s" % str(signum))
  51.     exit_flag.set()

  52. def start(run):
  53.     global pidfile

  54.     try:
  55.         pf = file(pidfile, 'r')
  56.         pid = int(pf.read().strip())
  57.         pf.close()
  58.     except IOError:
  59.         pid = None

  60.     if pid:
  61.         message = "pidfile %s already exists. Daemon already running!\n"
  62.         sys.stderr.write(message % pidfile)
  63.         sys.exit(1)

  64.     daemonize()
  65.     handle_signal()
  66.     run()

  67. def stop():
  68.     global pidfile
  69.     try:
  70.         pf = file(pidfile, 'r')
  71.         pid = int(pf.read().strip())
  72.         pf.close()
  73.     except IOError:
  74.         pid = None

  75.     if not pid:
  76.         message = "pidfile %s does not exist. Daemon not running!\n"
  77.         sys.stderr.write(message % pidfile)
  78.         return
  79.     try:
  80.         while True:
  81.             os.kill(pid, SIGTERM)
  82.             time.sleep(1)
  83.     except OSError, err:
  84.         if err.errno == errno.ESRCH:
  85.             if os.path.exists(pidfile):
  86.                 os.remove(pidfile)
  87.         else:
  88.             sys.exit(1)

  89. def restart(run):
  90.     stop()
  91.     start(run)


  92. def worker():
  93.     name = mp.current_process().name
  94.     message = "my name:%s,mypid is:%s\n"
  95.     sys.stderr.write(message % (name,os.getpid()))
  96.     while True:
  97.        if exit_flag.is_set():
  98.           break

  99. def test_start():
  100.     proc_pool = {}
  101.     proc_info = {}

  102.     for task in range(3):
  103.         print task
  104.         proc = mp.Process(target=worker)
  105.         proc.start()
  106.         proc_pool[proc.pid] = proc
  107.         ##debug info
  108.         sys.stderr.write('===pid %s: task %s===\n' % (proc.pid, proc.name))
  109.         proc_info[proc.pid] = task
  110.         #proc.join(timeout=1)


  111.     while True:
  112.         is_alive = False
  113.         for pid, proc in proc_pool.items():
  114.             #???
  115.             proc.join(timeout=1)
  116.             if proc.is_alive():
  117.                 is_alive = True
  118.             else:
  119.                 proc_pool.pop(pid)
  120.                 sys.stderr.write('===pid %s exit===\n' % (pid))
  121.                 if not exit_flag.is_set():
  122.                    #start another process for this task!(1)abort (2)???normal quit???
  123.                    proc1 = mp.Process(target=worker)
  124.                    proc1.start()
  125.                    proc_pool[proc1.pid]=proc1
  126.                    sys.stderr.write('===-pid %s: task %s-===\n' % (proc1.pid, proc_info[pid]))
  127.                    proc_info[proc1.pid]=proc_info[pid]
  128.                    proc_info.pop(pid)
  129.                    #proc1.join(timeout=1)

  130.                 #for k,v in proc_info.items():
  131.                 #      sys.stderr.write('new===pid %s: task %s===\n' % (k, v))
  132.                 #sys.stderr.write('~~~~~~~~~%s\n' % exit_flag.is_set())
  133.                
  134.         if not is_alive:
  135.             break

  136. def main():
  137.     if len(sys.argv) == 2:
  138.         if "start" == sys.argv[1]:
  139.                 start(test_start)
  140.         elif "stop" == sys.argv[1]:
  141.             stop()
  142.         elif "restart" == sys.argv[1]:
  143.             restart(run)            
  144.         else:
  145.             print "Unknown Command"
  146.             sys.exit(2)
  147.         sys.exit(0)
  148.     else:
  149.         print "usage: %s start|stop|restart" % sys.argv[0]
  150.         sys.exit(2)



  151. if __name__ == "__main__":
  152.     main()
复制代码
如上简化代码示例要实现的功能为: 某子进程异常退出时能自动重新生成一个,正常运行没有问题,  但是在模拟测试时发现,  在kill -9 某个子进程或者某两个时, 偶尔会出现父进程和其余子进程挂起的问题,不能新生成一个新进程(gdb看到是在sem_wait),MS exit_flag死锁了,这个应该怎么修正,哪位指导下,多谢!
#0  0x00000038e280d720 in sem_wait () from /lib64/libpthread.so.0
#1  0x00007f24dc80aef1 in ?? () from /usr/lib64/python2.6/lib-dynload/_multiprocessing.so
#2  0x000000357dad59e4 in PyEval_EvalFrameEx () from /usr/lib64/libpython2.6.so.1.0
#3  0x000000357dad6b8f in PyEval_EvalFrameEx () from /usr/lib64/libpython2.6.so.1.0
#4  0x000000357dad7657 in PyEval_EvalCodeEx () from /usr/lib64/libpython2.6.so.1.0

论坛徽章:
16
CU十二周年纪念徽章
日期:2013-10-24 15:41:3415-16赛季CBA联赛之广东
日期:2015-12-23 21:21:55青铜圣斗士
日期:2015-12-05 10:35:30黄金圣斗士
日期:2015-11-26 20:42:16神斗士
日期:2015-11-19 12:47:50每日论坛发贴之星
日期:2015-11-18 06:20:00程序设计版块每日发帖之星
日期:2015-11-18 06:20:002015亚冠之城南
日期:2015-11-10 19:10:492015亚冠之萨济拖拉机
日期:2015-10-28 18:47:282015亚冠之柏太阳神
日期:2015-08-30 17:21:492015亚冠之山东鲁能
日期:2015-07-07 18:48:39摩羯座
日期:2014-08-29 23:01:42
2 [报告]
发表于 2015-09-18 14:11 |只看该作者
  1. class Event(object):

  2.     def __init__(self):
  3.         self._cond = Condition(Lock())
  4.         self._flag = Semaphore(0)

  5.     def is_set(self):
  6.         self._cond.acquire()
  7.         try:
  8.             if self._flag.acquire(False):
  9.                 self._flag.release()
  10.                 return True
  11.             return False
  12.         finally:
  13.             self._cond.release()

  14.     def set(self):
  15.         self._cond.acquire()
  16.         try:
  17.             self._flag.acquire(False)
  18.             self._flag.release()
  19.             self._cond.notify_all()
  20.         finally:
  21.             self._cond.release()

  22.     def clear(self):
  23.         self._cond.acquire()
  24.         try:
  25.             self._flag.acquire(False)
  26.         finally:
  27.             self._cond.release()

  28.     def wait(self, timeout=None):
  29.         self._cond.acquire()
  30.         try:
  31.             if self._flag.acquire(False):
  32.                 self._flag.release()
  33.             else:
  34.                 self._cond.wait(timeout)

  35.             if self._flag.acquire(False):
  36.                 self._flag.release()
  37.                 return True
  38.             return False
  39.         finally:
  40.             self._cond.release()
复制代码
简单说说,以上是mp.Event代码的set 和is_set方法,可以看出是都存在信号量的获取的
所以你在中断和非中断都在调用Event的set和is_set  肯定存在死锁的可能啊
尽量不要在中断处理中调用信号量等处理

论坛徽章:
0
3 [报告]
发表于 2015-09-18 15:41 |只看该作者
多谢LS,我后来自己看了下,在父子进程中,有些东西是继承的(复制的),比如信号及信号处理函数,这样才导致死锁现象. 程序的本意就是父进程收到终止信号后,设置Event变量,子进程判断后退出, 因为处理不好(其实是在修別人的bug),导致子进程同样处理此信号, 改成启动子进程之前忽略信号,之后恢复,代码如下,如果还有问题,欢迎斧正.
  1. #!/usr/bin/python

  2. import os, sys, time, atexit, errno
  3. import multiprocessing as mp
  4. from signal import signal
  5. from signal import getsignal
  6. from signal import SIGTERM
  7. from signal import SIGINT
  8. from signal import SIG_IGN



  9. exit_flag = mp.Event()
  10. stdin =  "/dev/null"
  11. stdout = "/dev/null"
  12. #stderr = "/dev/null"
  13. stderr = "/tmp/test.log"
  14. pidfile ="/tmp/test.pid"

  15. def daemonize():
  16.     global stdin, stdout, stderr, pidfile

  17.     try:
  18.         pid = os.fork()
  19.         if pid > 0:
  20.             sys.exit(0)
  21.     except OSError, e:
  22.         sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
  23.         sys.exit(1)

  24.     os.chdir("/")
  25.     os.setsid()
  26.     os.umask(0)

  27.     try:
  28.         pid = os.fork()
  29.         if pid > 0:
  30.             sys.exit(0)
  31.     except OSError, e:
  32.         sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
  33.         sys.exit(1)

  34.     sys.stdout.flush()
  35.     sys.stderr.flush()
  36.     si = file(stdin, 'r')
  37.     so = file(stdout, 'a+')
  38.     se = file(stderr, 'a+', 0)
  39.     os.dup2(si.fileno(), sys.stdin.fileno())
  40.     os.dup2(so.fileno(), sys.stdout.fileno())
  41.     os.dup2(se.fileno(), sys.stderr.fileno())

  42.     atexit.register(delpid)
  43.     pid = str(os.getpid())
  44.     file(pidfile, 'w+').write("%s\n" % pid)

  45. def delpid():
  46.     os.remove(pidfile)

  47. def handle_signal():
  48.     signal(SIGINT, handler)
  49.     signal(SIGTERM, handler)

  50. def handler(signum, frame):
  51.     global exit_flag
  52.     sys.stderr.write("get a signal: %s" % str(signum))
  53.     exit_flag.set()

  54. def start(run):
  55.     global pidfile

  56.     try:
  57.         pf = file(pidfile, 'r')
  58.         pid = int(pf.read().strip())
  59.         pf.close()
  60.     except IOError:
  61.         pid = None

  62.     if pid:
  63.         message = "pidfile %s already exists. Daemon already running!\n"
  64.         sys.stderr.write(message % pidfile)
  65.         sys.exit(1)

  66.     daemonize()
  67.     handle_signal()
  68.     run()

  69. def stop():
  70.     global pidfile
  71.     try:
  72.         pf = file(pidfile, 'r')
  73.         pid = int(pf.read().strip())
  74.         pf.close()
  75.     except IOError:
  76.         pid = None

  77.     if not pid:
  78.         message = "pidfile %s does not exist. Daemon not running!\n"
  79.         sys.stderr.write(message % pidfile)
  80.         return
  81.     try:
  82.         while True:
  83.             os.kill(pid, SIGTERM)
  84.             time.sleep(1)
  85.     except OSError, err:
  86.         if err.errno == errno.ESRCH:
  87.             if os.path.exists(pidfile):
  88.                 os.remove(pidfile)
  89.         else:
  90.             sys.exit(1)

  91. def restart(run):
  92.     stop()
  93.     start(run)


  94. def worker():
  95.     name = mp.current_process().name
  96.     message = "my name:%s,mypid is:%s\n"
  97.     sys.stderr.write(message % (name,os.getpid()))
  98.     while True:
  99.        if exit_flag.is_set():
  100.           break

  101. def test_start():
  102.     proc_pool = {}
  103.     proc_info = {}
  104.     default_handler = getsignal(SIGTERM)
  105.     #Set signal handling of SIGTERM to ignore mode.
  106.     signal(SIGTERM, SIG_IGN)

  107.     for task in range(3):
  108.         print task
  109.         proc = mp.Process(target=worker)
  110.         proc.start()
  111.         proc_pool[proc.pid] = proc
  112.         ##debug info
  113.         sys.stderr.write('===pid %s: task %s===\n' % (proc.pid, proc.name))
  114.         proc_info[proc.pid] = task
  115.         #proc.join(timeout=1)

  116.     signal(SIGTERM, default_handler)

  117.     while True:
  118.         is_alive = False
  119.         for pid, proc in proc_pool.items():
  120.             #???
  121.             proc.join(timeout=1)
  122.             if proc.is_alive():
  123.                 is_alive = True
  124.             else:
  125.                 proc_pool.pop(pid)
  126.                 sys.stderr.write('===pid %s exit===\n' % (pid))
  127.                 if not exit_flag.is_set():
  128.                    #start another process for this task!(1)abort (2)???normal quit???
  129.                    signal(SIGTERM, SIG_IGN)
  130.                    proc1 = mp.Process(target=worker)
  131.                    proc1.start()
  132.                    signal(SIGTERM, default_handler)
  133.                    proc_pool[proc1.pid]=proc1
  134.                    sys.stderr.write('===-pid %s: task %s-===\n' % (proc1.pid, proc_info[pid]))
  135.                    proc_info[proc1.pid]=proc_info[pid]
  136.                    proc_info.pop(pid)
  137.                    #proc1.join(timeout=1)
  138.                    is_alive = True

  139.                 #for k,v in proc_info.items():
  140.                 #      sys.stderr.write('new===pid %s: task %s===\n' % (k, v))
  141.                 #sys.stderr.write('~~~~~~~~~%s\n' % exit_flag.is_set())
  142.                
  143.         if not is_alive:
  144.             break

  145. def main():
  146.     if len(sys.argv) == 2:
  147.         if "start" == sys.argv[1]:
  148.                 start(test_start)
  149.         elif "stop" == sys.argv[1]:
  150.             stop()
  151.         elif "restart" == sys.argv[1]:
  152.             restart(run)            
  153.         else:
  154.             print "Unknown Command"
  155.             sys.exit(2)
  156.         sys.exit(0)
  157.     else:
  158.         print "usage: %s start|stop|restart" % sys.argv[0]
  159.         sys.exit(2)



  160. if __name__ == "__main__":
  161.     main()
复制代码

论坛徽章:
16
CU十二周年纪念徽章
日期:2013-10-24 15:41:3415-16赛季CBA联赛之广东
日期:2015-12-23 21:21:55青铜圣斗士
日期:2015-12-05 10:35:30黄金圣斗士
日期:2015-11-26 20:42:16神斗士
日期:2015-11-19 12:47:50每日论坛发贴之星
日期:2015-11-18 06:20:00程序设计版块每日发帖之星
日期:2015-11-18 06:20:002015亚冠之城南
日期:2015-11-10 19:10:492015亚冠之萨济拖拉机
日期:2015-10-28 18:47:282015亚冠之柏太阳神
日期:2015-08-30 17:21:492015亚冠之山东鲁能
日期:2015-07-07 18:48:39摩羯座
日期:2014-08-29 23:01:42
4 [报告]
发表于 2015-09-18 17:44 |只看该作者
依然有问题啊  建议不要在中断去exit_flags.set

论坛徽章:
0
5 [报告]
发表于 2015-09-18 18:02 |只看该作者
tc1989tc 发表于 2015-09-18 17:44
依然有问题啊  建议不要在中断去exit_flags.set


如果要在收到信号后,去设置exit_flags.set呢?设置个变量,放在父进程主循环里做个判断,然后设置exit_flags.set么? 或者还有别的好的方法么?

论坛徽章:
16
CU十二周年纪念徽章
日期:2013-10-24 15:41:3415-16赛季CBA联赛之广东
日期:2015-12-23 21:21:55青铜圣斗士
日期:2015-12-05 10:35:30黄金圣斗士
日期:2015-11-26 20:42:16神斗士
日期:2015-11-19 12:47:50每日论坛发贴之星
日期:2015-11-18 06:20:00程序设计版块每日发帖之星
日期:2015-11-18 06:20:002015亚冠之城南
日期:2015-11-10 19:10:492015亚冠之萨济拖拉机
日期:2015-10-28 18:47:282015亚冠之柏太阳神
日期:2015-08-30 17:21:492015亚冠之山东鲁能
日期:2015-07-07 18:48:39摩羯座
日期:2014-08-29 23:01:42
6 [报告]
发表于 2015-09-18 19:31 |只看该作者
回复 5# linewer


    把你的需求说出来吧  然后在看怎么解决

论坛徽章:
0
7 [报告]
发表于 2015-09-18 20:11 |只看该作者
tc1989tc 发表于 2015-09-18 19:31
回复 5# linewer

就是父进程通过multiprocess启动几个进程(这些进程做一些独立的工作,比如请求web抓取页面),在父进程收到终止信号后,设置exit_flag.set, 子进程内的工作函数用到这个exit_flag做一些判断(平滑关闭链接),然后退出, 父进程会监控子进程的运行情况,退出就新启一个.   整个程序放到cron中,定期执行/终止

论坛徽章:
16
CU十二周年纪念徽章
日期:2013-10-24 15:41:3415-16赛季CBA联赛之广东
日期:2015-12-23 21:21:55青铜圣斗士
日期:2015-12-05 10:35:30黄金圣斗士
日期:2015-11-26 20:42:16神斗士
日期:2015-11-19 12:47:50每日论坛发贴之星
日期:2015-11-18 06:20:00程序设计版块每日发帖之星
日期:2015-11-18 06:20:002015亚冠之城南
日期:2015-11-10 19:10:492015亚冠之萨济拖拉机
日期:2015-10-28 18:47:282015亚冠之柏太阳神
日期:2015-08-30 17:21:492015亚冠之山东鲁能
日期:2015-07-07 18:48:39摩羯座
日期:2014-08-29 23:01:42
8 [报告]
发表于 2015-09-18 21:05 |只看该作者
sorry  现在你没在子进程中断中调用exit_flag.set啦   
你后面贴的代码 可以解决你的问题啦

论坛徽章:
0
9 [报告]
发表于 2015-09-18 21:35 |只看该作者
本帖最后由 linewer 于 2015-09-18 21:36 编辑
tc1989tc 发表于 2015-09-18 21:05
sorry  现在你没在子进程中断中调用exit_flag.set啦   
你后面贴的代码 可以解决你的问题啦


嗯, 生成子进程之前我先忽略此信号,之后恢复.  感觉是修复了.   

晚上测试,发现一种情况,
  1.     while True:
  2.         is_alive = False
  3.         for pid, proc in proc_pool.items():
  4.             #???
  5.             proc.join(timeout=1)
  6.             if proc.is_alive():
  7.                 is_alive = True
  8.             else:
  9.                 proc_pool.pop(pid)
  10.                 sys.stderr.write('===pid %s exit===\n' % (pid))
  11.                 if not exit_flag.is_set():
  12.                    #start another process for this task!(1)abort (2)???normal quit???
  13.                    signal(SIGTERM, SIG_IGN)
  14.                    proc1 = mp.Process(target=worker)
  15.                    proc1.start()
  16.                    signal(SIGTERM, default_handler)
  17.                    proc_pool[proc1.pid]=proc1
  18.                    sys.stderr.write('===-pid %s: task %s-===\n' % (proc1.pid, proc_info[pid]))
  19.                    proc_info[proc1.pid]=proc_info[pid]
  20.                    proc_info.pop(pid)
  21.                    #proc1.join(timeout=1)
  22.                    is_alive = True    #这个如果注释掉,在子进程全异常退出时,会生成新子进程,但是父进程按逻辑应该退出,gdb看到是waitpid, 这个估计是等新子进程,这应该是逻辑错误导致的???!!!

  23.                 #for k,v in proc_info.items():
  24.                 #      sys.stderr.write('new===pid %s: task %s===\n' % (k, v))
  25.                 #sys.stderr.write('~~~~~~~~~%s\n' % exit_flag.is_set())
  26.                
  27.         if not is_alive:
  28.             break
复制代码
感觉同步碰上中断处理确实是个麻烦事,而且是在修别人的写的代码

论坛徽章:
16
CU十二周年纪念徽章
日期:2013-10-24 15:41:3415-16赛季CBA联赛之广东
日期:2015-12-23 21:21:55青铜圣斗士
日期:2015-12-05 10:35:30黄金圣斗士
日期:2015-11-26 20:42:16神斗士
日期:2015-11-19 12:47:50每日论坛发贴之星
日期:2015-11-18 06:20:00程序设计版块每日发帖之星
日期:2015-11-18 06:20:002015亚冠之城南
日期:2015-11-10 19:10:492015亚冠之萨济拖拉机
日期:2015-10-28 18:47:282015亚冠之柏太阳神
日期:2015-08-30 17:21:492015亚冠之山东鲁能
日期:2015-07-07 18:48:39摩羯座
日期:2014-08-29 23:01:42
10 [报告]
发表于 2015-09-19 09:41 |只看该作者
看文档如下描述:
When a process exits, it attempts to terminate all of its daemonic child processes.

大概原因如下:
你的子程序忽略的term信号处理,所有导致父进程退出时,去终止子进程时候
会存在阻塞等到子进程退出。
解决方法:
1.父进程在创建完子进程后在重新注册你的信号处理
2.子进程恢复系统默认的信号处理 SIG_TERM
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP