- 论坛徽章:
- 0
|
本帖最后由 yyl910606 于 2016-07-11 13:29 编辑
代码瓶颈在于 字符拼接
测试机器 amd 4核 6G内存
不拼接字符串 24M 的数据 14秒处理完成
拼接字符串 24M数据 1个小时- #coding:utf8
- from time import sleep
- import multiprocessing
- from multiprocessing.process import Process
- import threading
- import os
- def process_func(line):
- result = {}
- bo = line[:-1].split()
- len_str = str(len(bo[3]))
- if len(bo) > 5:
- bo[0] = bo[0] + ":" + bo[5]
- if bo[4] == 'insertion':
- bo[2] = float(bo[2])+ 1
- if bo[1] in result:
- result[bo[1]] += str(bo[2]) + 'ins' + bo[3] + ' ' + bo[0] + ' '
- else:
- result[bo[1]] = str(bo[2]) + 'ins' + bo[3] + ' ' + bo[0] + ' '
- elif bo[4] == 'deletion':
- if bo[1] in result.keys():
- result[bo[1]] += str(bo[2]) + 'del' + len_str + ' ' + bo[0] + ' '
- else:
- result[bo[1]] = str(bo[2]) + 'del' + len_str + ' ' + bo[0] + ' '
- else:
- if bo[1] in result.keys():
- result[bo[1]] += str(bo[2])+'del'+len_str+' '+ bo[0]+' '
- b[2] = float(bo[2]) + 1
- result[bo[1]] += str(bo[2])+'ins'+bo[3]+' '+ bo[0]+' '
- else:
- result[bo[1]] = str(bo[2]) + 'del'+len_str+' '+ bo[0]+' '
- b[2] = float(bo[2]) + 1
- result[bo[1]] += str(bo[2]) + 'ins' + bo[3] + ' '+ bo[0]+' '
- return result
- def worker(rqueue, wqueue):
- while True:
- if not rqueue.empty():
- line = rqueue.get()
- rs = process_func(line)
- wqueue.put(rs)
- else:
- sleep(0.1)
- def dict_process(wqueue):
- while True:
- if not wqueue.empty():
- d = wqueue.get()
- for i in d.keys():
- global result
- result[i] += d[i]
- else:
- sleep(0.1)
- def main():
- global result
- result = {'chr' + str(i): '' for i in xrange(1,23)}
- for item in ['chrY', 'chrX', 'chrM']:
- result[item] = ''
- rqueue = multiprocessing.Queue()
- wqueue = multiprocessing.Queue()
- prs_num = multiprocessing.cpu_count()
- prs = []
- for i in xrange(prs_num):
- prs.append(Process(target=worker, args=(rqueue, wqueue,)))
- map(lambda x: x.start(), prs)
- dict_process_thread = threading.Thread(target=dict_process, args=(wqueue,))
- # 字符瓶颈
- # dict_process_thread.start() 关闭后速度明显快很多
- dbINDE = open('data.txt')
- while True:
- line = dbINDE.readline()
- if not line:
- break
- rqueue.put(line)
- #print wqueue.get()
- while True:
- #if wqueue.qsize() == 0: # 开启字符拼接
- if wqueue.qsize() != 0: #关闭字符拼接
- # kill child process
- #print result
- for p in prs:
- pid = p.pid
- if pid:
- os.kill(pid, 9)
- # kill self
- sleep(1)
- os.kill(os.getpid(), 9)
-
- if __name__ == '__main__':
- main()
复制代码 |
|