- 论坛徽章:
- 0
|
mysql 读写分离在面对一定的数据库压力下,的确有一定的帮助,读写分离方面的程序似乎也很多,mysql 官方也有个读写分离产品叫
MySQL Proxy 。 按照我理解,似乎实现基本的读写分离也不是很难。所以我按照自己的想法写了一个。目前在并发600的情况下是成功的。并发超过600后就有收割僵尸进程不完成的情况,导致进程假死状态。这个目前有个解决想法,就是把收割程序独立出来用一个进程来专门收割,这样可以防止,收割程序在并发大阻塞下响应不及时问题。~_~ 只是个想法。。
我的BLOG: blog.csdn.net/huithe 欢迎大家踩踩
下载地址:我放phchina.com上了。 CSDN不知道咋传的郁闷
http://bbs.phpchina.com/viewthread.php?tid=173892
目前流程:
php客户端 --> angel.py -- ---读写分离:读均衡 ---> mysql
angel mysql proxy 在并发模型上选择了 forking 。在均衡读上的算法目前只是随机,因为开发当时遇到个问题就是,如何把子进程使用
mysql 的状态数给保存下来传给主进程,然后主进程按照目前的各mysql 连接数均衡分配客户端读。 那会第一反应就是使用 file 来进行 进程间共享。但存在个锁问题。担心效率受损。 进而想到了 共享内存。不过找不到合适的py库。 今天才确定pipe 应该能解决这个问题 ~_~ 目前也只是个想法。。
在收割上,循环收割和信号收割。经过测试最后选择了信号收割,在并发下,似乎收割情况更好点
先看下配置文件,我直接使用了PY文件了。贪图方便
view plaincopy to clipboardprint?
#!/usr/bin/python
# -*- coding:utf-8 -*-
#读mysql
rd = {'host':'127.0.0.1',
'user':'root',
'passwd':'1',
'database':'beihai365'}
#写mysql
wt = {'host':'127.0.0.1',
'user':'root',
'passwd':'1',
'database':'beihai365'}
read_mysql_server = (rd,wt)
write_mysql_server = wt
charset = 'gbk'
error_log = '/var/log/angel.log'
#!/usr/bin/python
# -*- coding:utf-8 -*-
#读mysql
rd = {'host':'127.0.0.1',
'user':'root',
'passwd':'1',
'database':'beihai365'}
#写mysql
wt = {'host':'127.0.0.1',
'user':'root',
'passwd':'1',
'database':'beihai365'}
read_mysql_server = (rd,wt)
write_mysql_server = wt
charset = 'gbk'
error_log = '/var/log/angel.log'
定义日志:默认日志是 angel.log 里面主要记录了
查询mysql 的出错信息等。可以扩展
采集部分:
You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'ELECT * FROM salary' at line 1
You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'ELECT * FROM salary' at line 1
一看大概就明白了吧, 两台mysql 一台是读,一台是写,不过写的当然我们也可以读,那么等于是 read 两台 write 一台,当然按照配置文件可以放更多的read 服务器,写 mysql 目前我只允许一台,实现多台写也是比较简单的。
目前看这个配置,我只是是使用同一台mysql。。等改天有环境了再真的分离开吧。不过目前我在程序上加了调试,可以看得出,两台mysql轮流服务的效果。
目前查询的语句是这样的:select * from cdb_posts limit 20
目前这个表的数据情况是这样的:
beihai365# mysqlshow -u root -p beihai365 cdb_posts --count
Enter password:
Database: beihai365 Wildcard: cdb_posts
+-----------+----------+------------+
| Tables | Columns | Total Rows |
+-----------+----------+------------+
| cdb_posts | 21 | 11922687 |
+-----------+----------+------------+
1 row in set.
测试我使用了 webbench
1.运行 angel.py ,这个是 读写分离的服务端,只有一个文件。
beihai365# python angel.py
2. 开始运行 客户端脚本,我用php简单的封装了一个使用 angel.py 的类。 有了这个,我们就可以把angel mysql proxy 融入 PHP开发的项目中了。
angel.py 返回给客户端的是: 串行化的 array 。 这样PHP操作就方便了
beihai365# webbench -c 300 http://192.168.18.2/angel_proxy.php
angel.py 服务端在并发下显示(我在程序中加入的调试信息):
mysqld is 1 (使用了第二台mysql)
mysqld is 0 (使用了第一台mysql)
complete its work (子进程完成了查询任务)
reap child sucess 15258(收割了15258进程)
reap child sucess 15257...
complete its work..
complete its work...
reap child sucess 15261...
reap child sucess 15260..
webbench 显示:
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.
Benchmarking: GET http://192.168.18.2/angel_proxy.php
300 clients, running 30 sec.
Speed=43020 pages/min, 2738034 bytes/sec.
Requests: 21510 susceed, 0 failed.
OK。没有比较就不知道这样的结果是否是满意的。那么我就去掉angel.py 这个服务,直接使用php来对mysql 进行读操作。然后看下效率如何
beihai365# webbench -c 300 http://192.168.18.2/test.php
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.
Benchmarking: GET http://192.168.18.2/test.php
300 clients, running 30 sec.
Speed=47556 pages/min, 27228021 bytes/sec.
Requests: 23778 susceed, 0 failed.
相差不多。但别忘了。我这可是读写分离了,在真的读写分离情况下,angel.py 性能应该能提高很多。呵呵 这个以后搭建环境后测试,在公布结果。
贴代码:
angel.py
view plaincopy to clipboardprint?
#!/usr/local/bin/python
# -*- coding:utf-8 -*-
import socket,os,sys,time,MySQLdb,traceback,string,random,fcntl,signal
from angel_conf import *
#定义读写语句
read_sql_conf = ('select')
write_sql_conf = ('delete','update','alter')
mysql_read_switch = mysql_write_switch = 0
cluster = {}
def pre_match_sql(data):
return string.lower(data[:data.index(' ')].strip())
def seekreadmysql():
random.seed()
randmysqld = random.choice([i for i in range(len(read_mysql_server))])
print 'mysqld is %s' % randmysqld
return read_mysql_server[randmysqld]
def seekwritemysql():
return write_mysql_server
def strlen(strs):
return len(str(strs))
def reap(signum,stackframe):
'''''reap subfork'''
while True:
try:
result = os.waitpid(-1,os.WNOHANG)
if not result[0]: break
print 'reap child sucess %d' % result[0]
except:
break
signal.signal(signal.SIGCHLD,reap)
def serialize(rows):
sqltostr = '';i = 0;j = 1
for row in rows:
if j == 1:
bracket = ''
else:
bracket = '}'
sqltostr += '%si:%d;a:%d:{' % (bracket,j,len(row))
j += 1
for key in row:
sqltostr += 's:%d:"%s";s:%d:"%s";' % (strlen(key),key,strlen(row[key]),row[key])
i += 1
heads = 'a:%d:{' % int(j-1)
sqltostr = heads + sqltostr + '}}'
return sqltostr
class nettpl_content:
def __init__(self,clientsk):
self._clientsk = clientsk
def myhandler(self,rcmysql):
action = pre_match_sql(rcmysql)
if not len(action):
sys.exit(4)
#allow read
global mysql_read_switch;
global mysql_write_switch;
global cluster
if action in read_sql_conf:
try:
if not mysql_read_switch:
rl = seekreadmysql()
cluster['readconn'] = MySQLdb.connect(host=rl['host'],user=rl['user'],passwd=rl['passwd'],db=rl['database'])
mysql_read_switch = 1
except MySQLdb.Error:
print 'mysql connect error'
sys.exit(1)
try:
rdcursor = cluster['readconn'].cursor(MySQLdb.cursors.DictCursor)
rdcursor.execute("SET NAMES %s" % charset)
rdcursor.execute(rcmysql)
rows = rdcursor.fetchall()
except MySQLdb.Error,e:
errormsg = str(e.args[1])
self._clientsk.sendall(errormsg)
self.halt(rmsg)
sys.exit(1)
sqltostr = serialize(rows)
self._clientsk.sendall(sqltostr)
#self._clientsk.close()
rdcursor.close()
else:
try:
if not mysql_write_switch:
wl = seekwritemysql()
cluster['writecon'] = MySQLdb.connect(host=wl['host'],user=wl['user'],passwd=wl['passwd'],db=wl['database'])
mysql_write_switch = 1
except MySQLdb.Error:
print 'mysql connect error'
sys.exit(1)
print rcmysql
wrcursor = cluster['writecon'].cursor()
wrcursor.execute("SET NAMES %s" % charset)
wrcursor.execute(rcmysql)
wrcursor.close()
def halt(self,errorcontent):
hd = open(error_log,'a+')
fcntl.flock(hd,fcntl.LOCK_EX)
hd.write('%s\t%s%s' % (errorcontent,time.ctime(),os.linesep))
fcntl.flock(hd,fcntl.LOCK_UN)
hd.close()
class nettpl:
'''''forking network model'''
def __init__(self,host,port):
self.__host = host
self.__port = port
def server(self):
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.bind((self.__host,self.__port))
sk.listen(1)
global mysql_switch
while True:
try:
clientsk,clientaddr = sk.accept()
except socket.error:
continue
except KeyboardInterrupt:
print "[yhm]angel stop"
sys.exit(4)
signal.signal(signal.SIGCHLD,reap)
father = os.fork()
if father:
clientsk.close()
continue
else:
sk.close()
try:
while True:
clientdata = clientsk.recv(4096)
if not len(clientdata):break
#callback class
myhandler = nettpl_content(clientsk)
myhandler.myhandler(clientdata)
#except:
#traceback.print_exc()
finally:
print 'complete its work'
sys.exit(1)
network = nettpl('localhost',9009)#端口默认定义是9009
network.server()
#!/usr/local/bin/python
# -*- coding:utf-8 -*-
import socket,os,sys,time,MySQLdb,traceback,string,random,fcntl,signal
from angel_conf import *
#定义读写语句
read_sql_conf = ('select')
write_sql_conf = ('delete','update','alter')
mysql_read_switch = mysql_write_switch = 0
cluster = {}
def pre_match_sql(data):
return string.lower(data[:data.index(' ')].strip())
def seekreadmysql():
random.seed()
randmysqld = random.choice([i for i in range(len(read_mysql_server))])
print 'mysqld is %s' % randmysqld
return read_mysql_server[randmysqld]
def seekwritemysql():
return write_mysql_server
def strlen(strs):
return len(str(strs))
def reap(signum,stackframe):
'''reap subfork'''
while True:
try:
result = os.waitpid(-1,os.WNOHANG)
if not result[0]: break
print 'reap child sucess %d' % result[0]
except:
break
signal.signal(signal.SIGCHLD,reap)
def serialize(rows):
sqltostr = '';i = 0;j = 1
for row in rows:
if j == 1:
bracket = ''
else:
bracket = '}'
sqltostr += '%si:%d;a:%d:{' % (bracket,j,len(row))
j += 1
for key in row:
sqltostr += 's:%d:"%s";s:%d:"%s";' % (strlen(key),key,strlen(row[key]),row[key])
i += 1
heads = 'a:%d:{' % int(j-1)
sqltostr = heads + sqltostr + '}}'
return sqltostr
class nettpl_content:
def __init__(self,clientsk):
self._clientsk = clientsk
def myhandler(self,rcmysql):
action = pre_match_sql(rcmysql)
if not len(action):
sys.exit(4)
#allow read
global mysql_read_switch;
global mysql_write_switch;
global cluster
if action in read_sql_conf:
try:
if not mysql_read_switch:
rl = seekreadmysql()
cluster['readconn'] = MySQLdb.connect(host=rl['host'],user=rl['user'],passwd=rl['passwd'],db=rl['database'])
mysql_read_switch = 1
except MySQLdb.Error:
print 'mysql connect error'
sys.exit(1)
try:
rdcursor = cluster['readconn'].cursor(MySQLdb.cursors.DictCursor)
rdcursor.execute("SET NAMES %s" % charset)
rdcursor.execute(rcmysql)
rows = rdcursor.fetchall()
except MySQLdb.Error,e:
errormsg = str(e.args[1])
self._clientsk.sendall(errormsg)
self.halt(rmsg)
sys.exit(1)
sqltostr = serialize(rows)
self._clientsk.sendall(sqltostr)
#self._clientsk.close()
rdcursor.close()
else:
try:
if not mysql_write_switch:
wl = seekwritemysql()
cluster['writecon'] = MySQLdb.connect(host=wl['host'],user=wl['user'],passwd=wl['passwd'],db=wl['database'])
mysql_write_switch = 1
except MySQLdb.Error:
print 'mysql connect error'
sys.exit(1)
print rcmysql
wrcursor = cluster['writecon'].cursor()
wrcursor.execute("SET NAMES %s" % charset)
wrcursor.execute(rcmysql)
wrcursor.close()
def halt(self,errorcontent):
hd = open(error_log,'a+')
fcntl.flock(hd,fcntl.LOCK_EX)
hd.write('%s\t%s%s' % (errorcontent,time.ctime(),os.linesep))
fcntl.flock(hd,fcntl.LOCK_UN)
hd.close()
class nettpl:
'''forking network model'''
def __init__(self,host,port):
self.__host = host
self.__port = port
def server(self):
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.bind((self.__host,self.__port))
sk.listen(1)
global mysql_switch
while True:
try:
clientsk,clientaddr = sk.accept()
except socket.error:
continue
except KeyboardInterrupt:
print "[yhm]angel stop"
sys.exit(4)
signal.signal(signal.SIGCHLD,reap)
father = os.fork()
if father:
clientsk.close()
continue
else:
sk.close()
try:
while True:
clientdata = clientsk.recv(4096)
if not len(clientdata):break
#callback class
myhandler = nettpl_content(clientsk)
myhandler.myhandler(clientdata)
#except:
#traceback.print_exc()
finally:
print 'complete its work'
sys.exit(1)
network = nettpl('localhost',9009)#端口默认定义是9009
network.server()
php封装好的客户端:
view plaincopy to clipboardprint?
<?php
class angel{
const SEND_FLTAG = MSG_DONTROUTE;
private $socket;
public function connect($host,$port){
$this->socket = socket_create(AF_INET,SOCK_STREAM,SOL_TCP);
socket_connect($this->socket,$host,$port);
}
public function query($sql){
$sqldata = '';
$sqlreturnlen = 999999999;
socket_send($this->socket,$sql,strlen($sql),MSG_WAITALL);
$data = socket_read($this->socket,$sqlreturnlen);
$sqldata .= $data;
return $sqldata;
}
public function fetch_array($query){
if(($unquery = @unserialize($query))){
return $unquery;
}else{
exit($query);
}
}
public function close(){
socket_close($this->socket);
}
}
$sql = "select * from cdb_posts limit 20";
$angel = new angel();
$angel->connect('127.0.0.1',9009);
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
?>
<?php
class angel{
const SEND_FLTAG = MSG_DONTROUTE;
private $socket;
public function connect($host,$port){
$this->socket = socket_create(AF_INET,SOCK_STREAM,SOL_TCP);
socket_connect($this->socket,$host,$port);
}
public function query($sql){
$sqldata = '';
$sqlreturnlen = 999999999;
socket_send($this->socket,$sql,strlen($sql),MSG_WAITALL);
$data = socket_read($this->socket,$sqlreturnlen);
$sqldata .= $data;
return $sqldata;
}
public function fetch_array($query){
if(($unquery = @unserialize($query))){
return $unquery;
}else{
exit($query);
}
}
public function close(){
socket_close($this->socket);
}
}
$sql = "select * from cdb_posts limit 20";
$angel = new angel();
$angel->connect('127.0.0.1',9009);
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
print_r($angel->fetch_array($angel->query($sql)));
?>
php客户端返回的结果是数组,PHP使用十分方便:
view plaincopy to clipboardprint?
PHP客户端返回的结果:
Array ( [1] => Array ( [status] => 0 [parseurloff] => 0 [htmlon] => 0 [author] => 大海在呼唤 [attachment] => 0 [dateline] => 1052838851 [useip] => 219.159.158.122 [pid] => 1482 [rate] => 0
本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/huithe/archive/2010/01/30/5273086.aspx |
|