免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 4094 | 回复: 0
打印 上一主题 下一主题

python多线程读取MySQL执行效率优化 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2013-03-20 09:03 |只看该作者 |倒序浏览
本帖最后由 leoxqing 于 2013-03-20 09:43 编辑

[root@yetest01 app]# vi db.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)   
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
    os.mkdir(logdir)


def writelog(filename,message):
    file=logdir+'/'+filename
    f=open(file,"a")
    f.write(message)
    f.close

def maxdata():
    try:
        conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
        cur=conn.cursor()
        cur.execute("select max(userid) as maxid from roomfamily")
        data=cur.fetchone()
    finally:
        conn.close()
        return data[0]

maxiddata=maxdata()
minid=0
maxid=0
que=Queue.Queue()


def serial(id):
    global minid,maxid
    if minid < id:
       maxid=minid+9999
    if maxid > id:
       maxid=id
    return minid,maxid

class runsql(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
            global minid,maxid,mutex
            threadname = threading.currentThread().getName()
            mutex.acquire()
            #对数据加锁
            if maxid >= maxiddata:
               time.sleep(0.1)
            (minid,maxid)=serial(maxiddata)
            que.put((minid,maxid))
            if maxid < maxiddata :
                minid=maxid+1
            mutex.release()
            conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
            if not que.empty():
                (mid,aid)=que.get()
                #conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
                cur=conn.cursor()
                cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
                print threadname+" affect rows:%d:%s~%s"%(cur.rowcount,mid,aid)
                data=cur.fetchall();
                for num in data:
                   tablename="t_roomfamily_"+str(num[0]%200)
                   prefixname=str(num[0]%200)
                   writelog(tablename,num[1]+"\n")
                conn.close()
def main():
    global minid,maxid,mutex
    #### 定义循环序列,就是一个线程池     
    threads = []
    #### 定义运行所使用的线程数
    thread_lines =10
    start_line=0
    mutex = threading.Lock()
    for t in range(0,thread_lines):
        t=runsql()
        threads.append(t)
        start_line+=1
    for t in threads:
        t.start()
    while True:
      for num_line in xrange(0,thread_lines):
    #### 初始化当前线程的状态
         thread_status = False
    #### 初始化检查循环线程的开始值
         loop_line = 0
    #### 开始循环检查线程池中的线程状态
         while thread_status == False :
    #### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
    #### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。
    #### 如果线程池中线程全部在运行,则开始从头检查               
            if threads[loop_line].isAlive() == False:
                   threads[loop_line] = runsql()
                   threads[loop_line].start()
                   thread_status = True
            else:
                if loop_line >= thread_lines-1 :
                   loop_line=0
                else:
                   loop_line+=1
      if maxid >= maxiddata:
          break
    for number_line in xrange(start_line,thread_lines):
        thread[number_line].exit()

if __name__ == '__main__':
   main()

执行结果:
.........以上线程省略
Thread-18486 affect rows:12635:184850000~184859999
Thread-18487 affect rows:12630:184860000~184869999
Thread-18488 affect rows:12865:184870000~184879999
Thread-18489 affect rows:782:184880000~184880596
Thread-18490 affect rows:782:184880000~184880596

4个疑问:
1.最后2个线程读取的数据重复了,不知道BUG在哪里?
2.我愿意是想开10个线程,然后复用老线程的,从结果看出是不段的在新增加线程,没有达到我想到的目的,该如何优化?
3.多线程里读取数据库的数据然后写文件,如果多个线程同时刻在写同一个文件,会引起阻塞现象吗?现在发现此脚本执行起来效率不如任意。
4.以上代码还有其他该优化的地方,请大家提出来?

本人初学PYTHON,有很多疑问,望各位大侠指点迷津!小女子跪拜了,谢谢!
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP