- 论坛徽章:
- 0
|
本帖最后由 leoxqing 于 2013-03-20 09:43 编辑
[root@yetest01 app]# vi db.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
os.mkdir(logdir)
def writelog(filename,message):
file=logdir+'/'+filename
f=open(file,"a")
f.write(message)
f.close
def maxdata():
try:
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select max(userid) as maxid from roomfamily")
data=cur.fetchone()
finally:
conn.close()
return data[0]
maxiddata=maxdata()
minid=0
maxid=0
que=Queue.Queue()
def serial(id):
global minid,maxid
if minid < id:
maxid=minid+9999
if maxid > id:
maxid=id
return minid,maxid
class runsql(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global minid,maxid,mutex
threadname = threading.currentThread().getName()
mutex.acquire()
#对数据加锁
if maxid >= maxiddata:
time.sleep(0.1)
(minid,maxid)=serial(maxiddata)
que.put((minid,maxid))
if maxid < maxiddata :
minid=maxid+1
mutex.release()
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
if not que.empty():
(mid,aid)=que.get()
#conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
print threadname+" affect rows:%d:%s~%s"%(cur.rowcount,mid,aid)
data=cur.fetchall();
for num in data:
tablename="t_roomfamily_"+str(num[0]%200)
prefixname=str(num[0]%200)
writelog(tablename,num[1]+"\n")
conn.close()
def main():
global minid,maxid,mutex
#### 定义循环序列,就是一个线程池
threads = []
#### 定义运行所使用的线程数
thread_lines =10
start_line=0
mutex = threading.Lock()
for t in range(0,thread_lines):
t=runsql()
threads.append(t)
start_line+=1
for t in threads:
t.start()
while True:
for num_line in xrange(0,thread_lines):
#### 初始化当前线程的状态
thread_status = False
#### 初始化检查循环线程的开始值
loop_line = 0
#### 开始循环检查线程池中的线程状态
while thread_status == False :
#### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
#### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。
#### 如果线程池中线程全部在运行,则开始从头检查
if threads[loop_line].isAlive() == False:
threads[loop_line] = runsql()
threads[loop_line].start()
thread_status = True
else:
if loop_line >= thread_lines-1 :
loop_line=0
else:
loop_line+=1
if maxid >= maxiddata:
break
for number_line in xrange(start_line,thread_lines):
thread[number_line].exit()
if __name__ == '__main__':
main()
执行结果:
.........以上线程省略
Thread-18486 affect rows:12635:184850000~184859999
Thread-18487 affect rows:12630:184860000~184869999
Thread-18488 affect rows:12865:184870000~184879999
Thread-18489 affect rows:782:184880000~184880596
Thread-18490 affect rows:782:184880000~184880596
4个疑问:
1.最后2个线程读取的数据重复了,不知道BUG在哪里?
2.我愿意是想开10个线程,然后复用老线程的,从结果看出是不段的在新增加线程,没有达到我想到的目的,该如何优化?
3.多线程里读取数据库的数据然后写文件,如果多个线程同时刻在写同一个文件,会引起阻塞现象吗?现在发现此脚本执行起来效率不如任意。
4.以上代码还有其他该优化的地方,请大家提出来?
本人初学PYTHON,有很多疑问,望各位大侠指点迷津!小女子跪拜了,谢谢!
|
|