- 论坛徽章:
- 0
|
我处理日志用python 多线程,碰到的问题如下
单个文件 2G,单线程,读取后处理,写入另一个文件后 200MB,用时:20s
5个文件 10G,多线程,读取后处理,写入5个文件(同样也是每个文件200MB左右),用时大概2分钟
给我的感觉像是串行,和我预期(5个文件处理20s)相差太大,请问这是怎么回事?
#!/usr/bin/python
# encoding=gb18030
import sys
import re
import os
reload(sys)
sys.setdefaultencoding('gb18030')
import codecs
import threading
import logging
import time
allFileNum = 0
logger = logging.getLogger("myLogger")
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('extact.log')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formater = logging.Formatter('%(asctime)s - %(message)s')
fh.setFormatter(formater)
ch.setFormatter(formater)
logger.addHandler(fh)
logger.addHandler(ch)
class myThread(threading.Thread):
def __init__(self, threading_sum, file_path, target_city, target_dir):
threading.Thread.__init__(self)
self.threading_sum = threading_sum
self.target_city = target_city
self.file_path = file_path
self.target_dir = target_dir
def run(self):
with self.threading_sum:
logger.info("%s Starting!" % self.file_path)
time.sleep(1)
target_data(self.target_dir, self.file_path, self.target_city)
logger.info("%s Exiting!" % self.file_path)
def get_files(path):
global allFileNum
'''
打印一个目录下的所有文件夹和文件
'''
# 所有文件夹,第一个字段是次目录的级别
dirList = []
# 所有文件
fileList = []
# 返回一个列表,其中包含在目录条目的名称(google翻译)
files = os.listdir(path)
for f in files:
if (os.path.isdir(path + '/' + f)):
# 排除隐藏文件夹。因为隐藏文件夹过多
if (f[0] == '.'):
pass
else:
# 添加非隐藏文件夹
dirList.append(f)
if (os.path.isfile(path + '/' + f)):
# 添加文件
fileList.append(f)
# 当一个标志使用,文件夹列表第一个级别不打印
i_dl = 0
for dl in dirList:
if (i_dl == 0):
i_dl = i_dl + 1
return fileList
for fl in fileList:
# 打印文件
print '-' * (int(dirList[0])), fl
# 随便计算一下有多少个文件
allFileNum = allFileNum + 1
def extract(path, target_city):
target_dir = path + target_city
if os.path.isdir(target_dir):
pass
else:
os.makedirs(target_dir)
fileList = get_files(path)
# 设置线程数
threading_sum = threading.Semaphore(5)
for file in fileList:
filePath = path + file
if not os.path.isfile(filePath):
logger.error("the path is not file")
continue
thread = myThread(threading_sum, filePath, target_city, target_dir)
thread.start()
logger.info("All thread has create,Wait for all thread exit.")
# 等待所有线程结束
for t in threading.enumerate():
if t is threading.currentThread():
continue
t.join()
logger.info("All thread exit")
def target_data(target_dir, file_path, target_city):
logger.info(target_dir)
target_file_path = target_dir + "//" + os.path.basename(file_path) + "_" + target_city
regex = re.compile(r'service.*what=.*city=(.*)')
file = codecs.open(file_path, 'r', 'gb18030')
searchWordfile = codecs.open(target_file_path, 'w', 'gb18030')
while 1:
lines = file.readlines(100000)
if len(lines) < 1:
break
for line in lines:
searchObj = regex.search(line)
if searchObj:
city = searchObj.group(1)
if target_city in city:
searchWordfile.write(line)
def main():
# if len(sys.argv) < 2:
# print 'No action specified.'
# sys.exit()
# extract(path=sys.argv[1], target_city=sys.argv[2])
extract(path="D://log1//", target_city="北京")
logger.info("extract finished")
sys.exit()
if __name__ == '__main__':
main()
|
|