免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 4088 | 回复: 1
打印 上一主题 下一主题

最近写的一个mysql读写分离的,python 小工具:angel mysql proxy [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2010-02-01 09:43 |只看该作者 |倒序浏览
mysql 读写分离在面对一定的数据库压力下,的确有一定的帮助,读写分离方面的程序似乎也很多,mysql 官方也有个读写分离产品叫

MySQL Proxy 。  按照我理解,似乎实现基本的读写分离也不是很难。所以我按照自己的想法写了一个。目前在并发600的情况下是成功的。并发超过600后就有收割僵尸进程不完成的情况,导致进程假死状态。这个目前有个解决想法,就是把收割程序独立出来用一个进程来专门收割,这样可以防止,收割程序在并发大阻塞下响应不及时问题。~_~ 只是个想法。。

我的BLOG: blog.csdn.net/huithe  欢迎大家踩踩

下载地址:我放phchina.com上了。 CSDN不知道咋传的郁闷


http://bbs.phpchina.com/viewthread.php?tid=173892

目前流程:

php客户端  -->   angel.py -- ---读写分离:读均衡 --->  mysql

angel mysql proxy 在并发模型上选择了 forking 。在均衡读上的算法目前只是随机,因为开发当时遇到个问题就是,如何把子进程使用

mysql 的状态数给保存下来传给主进程,然后主进程按照目前的各mysql 连接数均衡分配客户端读。 那会第一反应就是使用 file 来进行 进程间共享。但存在个锁问题。担心效率受损。 进而想到了 共享内存。不过找不到合适的py库。 今天才确定pipe 应该能解决这个问题 ~_~ 目前也只是个想法。。

在收割上,循环收割和信号收割。经过测试最后选择了信号收割,在并发下,似乎收割情况更好点

先看下配置文件,我直接使用了PY文件了。贪图方便

view plaincopy to clipboardprint?
#!/usr/bin/python   
# -*- coding:utf-8 -*-   
#读mysql   
rd = {'host':'127.0.0.1',   
      'user':'root',   
      'passwd':'1',   
      'database':'beihai365'}   
#写mysql   
wt = {'host':'127.0.0.1',   
      'user':'root',   
      'passwd':'1',   
      'database':'beihai365'}   
  
read_mysql_server     = (rd,wt)   
write_mysql_server    = wt   
  
charset = 'gbk'  
error_log = '/var/log/angel.log'  
#!/usr/bin/python
# -*- coding:utf-8 -*-
#读mysql
rd = {'host':'127.0.0.1',
      'user':'root',
      'passwd':'1',
      'database':'beihai365'}
#写mysql
wt = {'host':'127.0.0.1',
      'user':'root',
      'passwd':'1',
      'database':'beihai365'}

read_mysql_server     = (rd,wt)
write_mysql_server    = wt

charset = 'gbk'
error_log = '/var/log/angel.log'


定义日志:默认日志是 angel.log   里面主要记录了

查询mysql 的出错信息等。可以扩展

采集部分:

You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'ELECT * FROM salary' at line 1
You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'ELECT * FROM salary' at line 1

一看大概就明白了吧, 两台mysql 一台是读,一台是写,不过写的当然我们也可以读,那么等于是 read 两台  write 一台,当然按照配置文件可以放更多的read 服务器,写 mysql 目前我只允许一台,实现多台写也是比较简单的。

目前看这个配置,我只是是使用同一台mysql。。等改天有环境了再真的分离开吧。不过目前我在程序上加了调试,可以看得出,两台mysql轮流服务的效果。

目前查询的语句是这样的:select * from cdb_posts limit 20

目前这个表的数据情况是这样的:

beihai365# mysqlshow -u root -p beihai365 cdb_posts --count
Enter password:
Database: beihai365  Wildcard: cdb_posts
+-----------+----------+------------+
|  Tables   | Columns  | Total Rows |
+-----------+----------+------------+
| cdb_posts |       21 |   11922687 |
+-----------+----------+------------+
1 row in set.

测试我使用了 webbench

1.运行 angel.py ,这个是 读写分离的服务端,只有一个文件。

beihai365# python angel.py

2. 开始运行 客户端脚本,我用php简单的封装了一个使用 angel.py 的类。 有了这个,我们就可以把angel mysql proxy 融入 PHP开发的项目中了。

angel.py 返回给客户端的是: 串行化的 array  。 这样PHP操作就方便了

beihai365# webbench -c 300 http://192.168.18.2/angel_proxy.php

angel.py 服务端在并发下显示(我在程序中加入的调试信息):

mysqld is 1  (使用了第二台mysql)
mysqld is 0 (使用了第一台mysql)
complete its work (子进程完成了查询任务)
reap child sucess 15258(收割了15258进程)
reap child sucess 15257...
complete its work..
complete its work...
reap child sucess 15261...
reap child sucess 15260..


webbench 显示:

Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.

Benchmarking: GET http://192.168.18.2/angel_proxy.php
300 clients, running 30 sec.

Speed=43020 pages/min, 2738034 bytes/sec.
Requests: 21510 susceed, 0 failed.


OK。没有比较就不知道这样的结果是否是满意的。那么我就去掉angel.py 这个服务,直接使用php来对mysql 进行读操作。然后看下效率如何

beihai365# webbench -c 300 http://192.168.18.2/test.php
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.

Benchmarking: GET http://192.168.18.2/test.php
300 clients, running 30 sec.

Speed=47556 pages/min, 27228021 bytes/sec.
Requests: 23778 susceed, 0 failed.

相差不多。但别忘了。我这可是读写分离了,在真的读写分离情况下,angel.py 性能应该能提高很多。呵呵 这个以后搭建环境后测试,在公布结果。

贴代码:

angel.py

view plaincopy to clipboardprint?
#!/usr/local/bin/python   
# -*- coding:utf-8 -*-   
  
import socket,os,sys,time,MySQLdb,traceback,string,random,fcntl,signal   
from angel_conf import *   
  
#定义读写语句   
read_sql_conf   =   ('select')   
write_sql_conf  =   ('delete','update','alter')   
mysql_read_switch = mysql_write_switch = 0  
cluster = {}   
def pre_match_sql(data):   
    return string.lower(data[:data.index(' ')].strip())   
  
def seekreadmysql():   
    random.seed()   
    randmysqld = random.choice([i for i in range(len(read_mysql_server))])   
    print 'mysqld is %s' % randmysqld   
    return read_mysql_server[randmysqld]   
  
def seekwritemysql():   
    return write_mysql_server   
  
def strlen(strs):   
    return len(str(strs))   
  
def reap(signum,stackframe):   
    '''''reap subfork'''  
    while True:   
        try:   
            result = os.waitpid(-1,os.WNOHANG)   
            if not result[0]: break  
            print 'reap child sucess %d' % result[0]   
        except:   
            break  
    signal.signal(signal.SIGCHLD,reap)   
  
def serialize(rows):   
    sqltostr = '';i = 0;j = 1  
    for row in rows:   
        if j == 1:   
            bracket = ''  
        else:   
            bracket = '}'  
        sqltostr += '%si:%d;a:%d:{' % (bracket,j,len(row))   
        j += 1  
        for key in row:   
            sqltostr += 's:%d:"%s";s:%d:"%s";' % (strlen(key),key,strlen(row[key]),row[key])   
            i += 1  
    heads = 'a:%d:{' % int(j-1)   
    sqltostr = heads + sqltostr + '}}'  
    return sqltostr   
  
class nettpl_content:   
    def __init__(self,clientsk):   
        self._clientsk = clientsk   
           
    def myhandler(self,rcmysql):   
        action = pre_match_sql(rcmysql)   
        if not len(action):   
            sys.exit(4)   
        #allow read   
        global mysql_read_switch;   
        global mysql_write_switch;   
        global cluster   
        if action in read_sql_conf:   
            try:   
                if not mysql_read_switch:   
                    rl = seekreadmysql()   
                    cluster['readconn'] = MySQLdb.connect(host=rl['host'],user=rl['user'],passwd=rl['passwd'],db=rl['database'])   
                    mysql_read_switch = 1  
            except MySQLdb.Error:   
                print 'mysql connect error'  
                sys.exit(1)   
  
            try:   
                rdcursor = cluster['readconn'].cursor(MySQLdb.cursors.DictCursor)   
                rdcursor.execute("SET NAMES %s" % charset)   
                rdcursor.execute(rcmysql)   
                rows = rdcursor.fetchall()   
            except MySQLdb.Error,e:   
                errormsg = str(e.args[1])   
                self._clientsk.sendall(errormsg)   
                self.halt(rmsg)   
                sys.exit(1)   
                  
            sqltostr = serialize(rows)   
            self._clientsk.sendall(sqltostr)   
            #self._clientsk.close()   
            rdcursor.close()   
        else:   
            try:   
                if not mysql_write_switch:   
                    wl = seekwritemysql()   
                    cluster['writecon'] = MySQLdb.connect(host=wl['host'],user=wl['user'],passwd=wl['passwd'],db=wl['database'])   
                    mysql_write_switch = 1  
            except MySQLdb.Error:   
                print 'mysql connect error'  
                sys.exit(1)   
            print rcmysql   
            wrcursor = cluster['writecon'].cursor()   
            wrcursor.execute("SET NAMES %s" % charset)   
            wrcursor.execute(rcmysql)   
            wrcursor.close()   
      
    def halt(self,errorcontent):   
        hd = open(error_log,'a+')   
        fcntl.flock(hd,fcntl.LOCK_EX)   
        hd.write('%s\t%s%s' % (errorcontent,time.ctime(),os.linesep))   
        fcntl.flock(hd,fcntl.LOCK_UN)   
        hd.close()   
  
class nettpl:   
    '''''forking network model'''  
    def __init__(self,host,port):   
        self.__host = host   
        self.__port = port   
      
    def server(self):   
        sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)   
        sk.bind((self.__host,self.__port))   
        sk.listen(1)   
        global mysql_switch   
           
        while True:   
            try:      
                clientsk,clientaddr = sk.accept()   
            except socket.error:   
                continue  
            except KeyboardInterrupt:   
                print "[yhm]angel stop"  
                sys.exit(4)   
            signal.signal(signal.SIGCHLD,reap)   
               
            father = os.fork()   
            if father:   
                clientsk.close()   
                continue  
            else:   
                sk.close()   
                try:   
                    while True:   
                        clientdata = clientsk.recv(4096)   
                        if not len(clientdata):break  
                        #callback class   
  
                        myhandler = nettpl_content(clientsk)   
                        myhandler.myhandler(clientdata)   
                #except:   
                    #traceback.print_exc()   
                finally:   
                    print 'complete its work'  
                    sys.exit(1)   
                       
network = nettpl('localhost',9009)#端口默认定义是9009   
network.server()  
#!/usr/local/bin/python
# -*- coding:utf-8 -*-

import socket,os,sys,time,MySQLdb,traceback,string,random,fcntl,signal
from angel_conf import *

#定义读写语句
read_sql_conf   =   ('select')
write_sql_conf  =   ('delete','update','alter')
mysql_read_switch = mysql_write_switch = 0
cluster = {}
def pre_match_sql(data):
    return string.lower(data[:data.index(' ')].strip())

def seekreadmysql():
    random.seed()
    randmysqld = random.choice([i for i in range(len(read_mysql_server))])
    print 'mysqld is %s' % randmysqld
    return read_mysql_server[randmysqld]

def seekwritemysql():
    return write_mysql_server

def strlen(strs):
    return len(str(strs))

def reap(signum,stackframe):
    '''reap subfork'''
    while True:
        try:
            result = os.waitpid(-1,os.WNOHANG)
            if not result[0]: break
            print 'reap child sucess %d' % result[0]
        except:
            break
    signal.signal(signal.SIGCHLD,reap)

def serialize(rows):
    sqltostr = '';i = 0;j = 1
    for row in rows:
        if j == 1:
            bracket = ''
        else:
            bracket = '}'
        sqltostr += '%si:%d;a:%d:{' % (bracket,j,len(row))
        j += 1
        for key in row:
            sqltostr += 's:%d:"%s";s:%d:"%s";' % (strlen(key),key,strlen(row[key]),row[key])
            i += 1
    heads = 'a:%d:{' % int(j-1)
    sqltostr = heads + sqltostr + '}}'
    return sqltostr

class nettpl_content:
    def __init__(self,clientsk):
        self._clientsk = clientsk
        
    def myhandler(self,rcmysql):
        action = pre_match_sql(rcmysql)
        if not len(action):
            sys.exit(4)
        #allow read
        global mysql_read_switch;
        global mysql_write_switch;
        global cluster
        if action in read_sql_conf:
            try:
                if not mysql_read_switch:
                    rl = seekreadmysql()
                    cluster['readconn'] = MySQLdb.connect(host=rl['host'],user=rl['user'],passwd=rl['passwd'],db=rl['database'])
                    mysql_read_switch = 1
            except MySQLdb.Error:
                print 'mysql connect error'
                sys.exit(1)

            try:
                rdcursor = cluster['readconn'].cursor(MySQLdb.cursors.DictCursor)
                rdcursor.execute("SET NAMES %s" % charset)
                rdcursor.execute(rcmysql)
                rows = rdcursor.fetchall()
            except MySQLdb.Error,e:
                errormsg = str(e.args[1])
                self._clientsk.sendall(errormsg)
                self.halt(rmsg)
                sys.exit(1)
               
            sqltostr = serialize(rows)
            self._clientsk.sendall(sqltostr)
            #self._clientsk.close()
            rdcursor.close()
        else:
            try:
                if not mysql_write_switch:
                    wl = seekwritemysql()
                    cluster['writecon'] = MySQLdb.connect(host=wl['host'],user=wl['user'],passwd=wl['passwd'],db=wl['database'])
                    mysql_write_switch = 1
            except MySQLdb.Error:
                print 'mysql connect error'
                sys.exit(1)
            print rcmysql
            wrcursor = cluster['writecon'].cursor()
            wrcursor.execute("SET NAMES %s" % charset)
            wrcursor.execute(rcmysql)
            wrcursor.close()
   
    def halt(self,errorcontent):
        hd = open(error_log,'a+')
        fcntl.flock(hd,fcntl.LOCK_EX)
        hd.write('%s\t%s%s' % (errorcontent,time.ctime(),os.linesep))
        fcntl.flock(hd,fcntl.LOCK_UN)
        hd.close()

class nettpl:
    '''forking network model'''
    def __init__(self,host,port):
        self.__host = host
        self.__port = port
   
    def server(self):
        sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        sk.bind((self.__host,self.__port))
        sk.listen(1)
        global mysql_switch
        
        while True:
            try:   
                clientsk,clientaddr = sk.accept()
            except socket.error:
                continue
            except KeyboardInterrupt:
                print "[yhm]angel stop"
                sys.exit(4)
            signal.signal(signal.SIGCHLD,reap)
            
            father = os.fork()
            if father:
                clientsk.close()
                continue
            else:
                sk.close()
                try:
                    while True:
                        clientdata = clientsk.recv(4096)
                        if not len(clientdata):break
                        #callback class

                        myhandler = nettpl_content(clientsk)
                        myhandler.myhandler(clientdata)
                #except:
                    #traceback.print_exc()
                finally:
                    print 'complete its work'
                    sys.exit(1)
                    
network = nettpl('localhost',9009)#端口默认定义是9009
network.server()




php封装好的客户端:

view plaincopy to clipboardprint?
<?php   
    class angel{   
        const SEND_FLTAG = MSG_DONTROUTE;   
        private $socket;   
           
           
        public function connect($host,$port){   
            $this->socket = socket_create(AF_INET,SOCK_STREAM,SOL_TCP);   
            socket_connect($this->socket,$host,$port);   
        }   
           
        public function query($sql){   
            $sqldata = '';   
            $sqlreturnlen = 999999999;   
            socket_send($this->socket,$sql,strlen($sql),MSG_WAITALL);   
            $data = socket_read($this->socket,$sqlreturnlen);   
            $sqldata .= $data;   
            return $sqldata;   
        }   
           
        public function fetch_array($query){   
            if(($unquery = @unserialize($query))){   
                return $unquery;   
            }else{   
                exit($query);   
            }   
        }   
           
        public function close(){   
            socket_close($this->socket);   
        }   
    }   
      
    $sql = "select * from cdb_posts limit 20";   
    $angel = new angel();   
    $angel->connect('127.0.0.1',9009);   
    print_r($angel->fetch_array($angel->query($sql)));   
    print_r($angel->fetch_array($angel->query($sql)));   
    print_r($angel->fetch_array($angel->query($sql)));   
    print_r($angel->fetch_array($angel->query($sql)));   
    print_r($angel->fetch_array($angel->query($sql)));   
    print_r($angel->fetch_array($angel->query($sql)));   
    print_r($angel->fetch_array($angel->query($sql)));   
?>  
<?php
        class angel{
                const SEND_FLTAG = MSG_DONTROUTE;
                private $socket;
               
               
                public function connect($host,$port){
                        $this->socket = socket_create(AF_INET,SOCK_STREAM,SOL_TCP);
                        socket_connect($this->socket,$host,$port);
                }
               
                public function query($sql){
                        $sqldata = '';
                        $sqlreturnlen = 999999999;
                        socket_send($this->socket,$sql,strlen($sql),MSG_WAITALL);
                        $data = socket_read($this->socket,$sqlreturnlen);
                        $sqldata .= $data;
                        return $sqldata;
                }
               
                public function fetch_array($query){
                        if(($unquery = @unserialize($query))){
                                return $unquery;
                        }else{
                                exit($query);
                        }
                }
               
                public function close(){
                        socket_close($this->socket);
                }
        }
       
        $sql = "select * from cdb_posts limit 20";
        $angel = new angel();
        $angel->connect('127.0.0.1',9009);
        print_r($angel->fetch_array($angel->query($sql)));
        print_r($angel->fetch_array($angel->query($sql)));
        print_r($angel->fetch_array($angel->query($sql)));
        print_r($angel->fetch_array($angel->query($sql)));
        print_r($angel->fetch_array($angel->query($sql)));
        print_r($angel->fetch_array($angel->query($sql)));
        print_r($angel->fetch_array($angel->query($sql)));
?>

php客户端返回的结果是数组,PHP使用十分方便:

view plaincopy to clipboardprint?
PHP客户端返回的结果:   
  
  
  
Array ( [1] => Array ( [status] => 0 [parseurloff] => 0 [htmlon] => 0 [author] => 大海在呼唤 [attachment] => 0 [dateline] => 1052838851 [useip] => 219.159.158.122 [pid] => 1482 [rate] => 0



本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/huithe/archive/2010/01/30/5273086.aspx

论坛徽章:
0
2 [报告]
发表于 2010-02-02 19:18 |只看该作者
居然不是透明的,失望
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP