- 论坛徽章:
- 11
|
还是从自己代码找问题吧,Queue的效率不至于那么低。
CPU:单核3.4GHz
内存:1GB
OS:CentOS 6.4
Python: 2.6.6
场景:在竞争条件下queue的效率,多个生产者进程(示例中为10个)写queue,每个记录为4KiB,每个生产者写100K个记录,一个消费者进程读queue,尽量消除IO的影响,使消费者写/dev/null
结论:平均数据写入速度高于50MiB/s
代码:- #!/usr/bin/env python
- import io, os, sys
- from multiprocessing import Process, Queue
- from datetime import datetime
- def producer(q, no):
- blk_data = bytearray(4096) # bytes
- cnt = 0
- while cnt < 10 ** 5:
- q.put(blk_data)
- cnt += 1
- def consumer(q):
- begin_time = datetime.now()
- blk_cnt = 0
- with io.open(os.devnull, 'wb') as fp:
- while True:
- blk_data = q.get()
- if not isinstance(blk_data, bytearray):
- break
- fp.write(blk_data)
- blk_cnt += 1
- end_time = datetime.now()
- print 'duration', end_time - begin_time
- print 'data size', blk_cnt / 256 , 'MiB'
- def main():
- if len(sys.argv) != 2:
- sys.exit('{0} producers'.format(sys.argv[0]))
- log_queue = Queue() # the queue of log record
- proc_producer = [
- Process(target=producer, args=(log_queue, i))
- for i in range(int(sys.argv[1], 10))]
- proc_consumer = Process(target=consumer, args=(log_queue,))
- for p in proc_producer:
- p.start()
- proc_consumer.start()
- for p in proc_producer:
- p.join()
- log_queue.put(None) # notify consumer to exit
- proc_consumer.join()
- if __name__ == '__main__':
- main()
复制代码 运行:- $ ./mp_queue.py 10
- duration 0:01:07.505177
- data size 3906 MiB
复制代码 |
|