- 论坛徽章:
- 3
|
本帖最后由 osdba 于 2011-11-20 17:17 编辑
此文章也可见我的blog: http://blog.osdba.net/?post=72
我们知道plproxy本身并不提供高可用方案。需要使用第三方的软件来实现高可用方案。这里提供了一种通过写一个简单的python脚本的来探测后面的数据库的状态,如果发现后面的数据库坏了,就让其修改数据路由,将其切换到备数据库的方法。这里提供的方案数据并不做水平拆分,而是做垂直拆分的方案。也就是按业务拆分数据,A业务的数据放在第一台数据库上,B业务的数据放到第二台数据库上。当然,如果想做数据水平拆分,只要修改一下数据路由的hash函数,也是很容易做到的。
机器说明:
下面两台机器做plproxy代理库
192.168.10.31: proxydb,安装plproxy,做函数代理
192.168.10.32: proxydb,安装plproxy,做函数代理
下面4台机器放真实的业务数据
192.168.10.33: db01
192.168.10.34: db01
192.168.10.35: db01
192.168.10.36: db01
按plproxy的标准方法,在plproxy代理库,建plproxy的3个标准函数:
create or replace function plproxy.get_cluster_version(cluster_name text)
returns integer as $$
begin
if cluster_name = 'cluster01' then
return 1;
end if;
raise exception 'no such cluster: %', cluster_name;
end;
$$ language plpgsql;
create or replace function plproxy.get_cluster_config(cluster_name text, out key text, out val text)
returns setof record as $$
begin
key := 'statement_timeout';
val := 60;
return next;
return;
end; $$ language plpgsql;
CREATE OR REPLACE FUNCTION plproxy.get_cluster_partitions(cluster_name text)
RETURNS SETOF text AS $$
BEGIN
IF cluster_name = 'cluster01' THEN
RETURN NEXT 'dbname=db01 host=192.168.10.33 user=buser password=buser';
RETURN NEXT 'dbname=db01 host=192.168.10.34 user=buser password=buser';
RETURN NEXT 'dbname=db01 host=192.168.10.35 user=buser password=buser';
RETURN NEXT 'dbname=db01 host=192.168.10.36 user=buser password=buser';
RETURN;
END IF;
RAISE EXCEPTION 'Unknown cluster';
END;
$$ LANGUAGE plpgsql;
在这个集群中,有4台机器:
192.168.10.33 192.168.10.34 : 放业务A的数据,192.168.10.33为主库,192.168.10.34为备库
192.168.10.35 192.168.10.36:放业务B的数据,192.168.10.35为主库,192.168.10.36为备库。
在proxydb代理库上建主备库的关系表:
create table ha_config(
ha_groupid int primary key, --每一对主备库,分配一个组id,
primary_ip text, --HA组中主数据库的IP
standby_ip text,--HA组中备数据库的IP
primary_hostid int,--HA组中主数据库的ID,也就是在plproxy.get_cluster_partitions函数中主机的顺序号
standby_hostid int,--HA组中备数据库的ID,
current_hostid int,--当前HA在哪台机器上
primary_hearttime timestamp, --主数据库的心跳时间,表明探测程序在这个时间探测时,数据库是好的。
standby_hearttime timestamp);--备数据库的心跳时间,表明探测程序在这个时间探测时,数据库是好的。
数据节点有两组互为主备的节点,插入两行数据:
insert into ha_config values(1,'192.168.10.33','192.168.10.34',0,1,0,now(),now());
insert into ha_config values(2,'192.168.10.35','192.168.10.36',2,3,2,now(),now());
建一个函数路由表,表明这个这个函数应该被路由到哪个HA组上的主机上:
create table function_route(
funcname text primary key,
ha_groupid int);
建hauser用户,此用户用来在proxydb上修改路由:
create user hauser password 'hauser';
grant ALL on ha_config to hauser;
grant ALL on function_route to hauser;
建可以根据ha_config表中current_hostid来计算路由的hash函数plp_route_by_funcname:
CREATE OR REPLACE FUNCTION public.plp_route_by_funcname(key text) returns int
AS $$
declare
ret int;
begin
SELECT current_hostid INTO ret FROM function_route a,ha_config b where a.funcname=key and a.ha_groupid=b.ha_groupid;
return ret;
end;
$$ LANGUAGE plpgsql;
在每个数据节点的数据库上:
建hauser,后面的python程序用此用户来探测这个数据库是否还活着:
create user hauser password 'hauser';
建立心跳表,hauser用户来更新这个表,如果不能更新,则说明这个数据库坏了:
create table xdual(x timestamp);
insert into xdual values(now());
grant ALL on xdual to hauser;
在proxydb上建一个测试函数:get_username,此函数根据userid得到用户名:
CREATE OR REPLACE FUNCTION public.get_username(userid int) returns text
AS $$
CLUSTER 'cluster01';
RUN ON plp_route_by_funcname('get_username');
$$ LANGUAGE plproxy;
在function_route中插入记录:
insert into function_route values('get_username',1);
这条记录是把函数get_username的调用路由到了HA组1中,也就是“192.168.10.33,192.168.10.34”的HA组中,具体是路由到192.168.10.33还是192.168.10.34,则是由ha_config表中的“current_hostid”来指定的,初使用化时,current_hostid为0,所以会路由到192.168.10.33上。当python程序(后面的内容会讲这个python程序)当检查到192.168.10.33出现问题时,会把current_hostid改为1,这时这个函数的调用就会被路由到192.168.10.34上,从而通过这种方法实现了HA。
在数据节点192.168.10.33和192.168.10.34上:
create table myuser(id int primary key, name text);
insert into myuser select generate_series(1,10000),'user'||generate_series(1,10000);
create or replace function public.get_username(userid int) returns text as $$
declare
ret text;
begin
SELECT name INTO ret FROM myuser where id=userid;
return ret;
end;
$$ language plpgsql;
后面讲解如何通过python脚本来探测后面的数据库的状态,并自动进行切换的问题。
python为每一个后面的数据节点启动一个线程,每隔一段时间就更新一个数据节点上的xdual心跳表,如果更新成功,则表ha_config表中primary_hearttime字段或standby_hearttime字段更新成当前时间,这样再启动一个线程,如果发现primary_hearttime和standby_hearttime时间与当前时间对比时超过心跳时间时,就说明这个数据库出问题了,如果primary_hearttime和standby_hearttime时间与当前时间对比进都超过了心跳时间,则说明主备库出现问题了,则不切换,反之则应该切换到对端数据库上。我写的python程序如下:
#!/usr/bin/env python
# osdba 2011.11.20
# -*- coding:UTF-8
import psycopg2
import threading
import datetime
import time
import random
import signal
import sys
import traceback
g_hauser="hauser"
g_hapass="hauser"
g_proxydb="proxydb"
#g_logfile="/home/postgres/log/pg_error.log"
g_logfile="/home/postgres/plpha/myerror.log"
#心跳时间,也就是更新心跳表xdual的周期
g_heartbeat_interval = 10
#当ha_config表中primary_hearttime和standby_hearttime的值与当前时间超过下面的秒数后,就切换,此值应该要大于g_heartbeat_interval
g_switch_timedelay = 20
#连接数据库的超时时间
g_connect_timeout = 10
g_running = True
def myhandle(signum=0, e=0):
"""处理信号的函数"""
global g_running
print "recv sig %d" % (signum)
g_running = False
def errlog(errinfo):
global g_logfile
f=open(g_logfile,'a')
outinfo= time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))+" : plpcluster : "+errinfo+"\n"
f.write(outinfo)
print outinfo
f.close()
def connect_proxydb():
global g_hauser
global g_hapass
global g_proxydb
global g_connect_timeout
conn = psycopg2.connect("host=127.0.0.1 dbname=%s user=%s password=%s connect_timeout=%s" % (g_proxydb,g_hauser,g_hapass,g_connect_timeout) )
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
return conn
def connect_proxydb():
global g_runningplpha.py
global g_heartbeat_interval
global g_connect_timeout
errcnt = 0
while g_running:
try:
connstr = "host=127.0.0.1 dbname=%s user=%s password=%s connect_timeout=%s" % (g_proxydb,g_hauser,g_hapass,g_connect_timeout)
conn = psycopg2.connect(connstr)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
if errcnt > 0:
errlog("NOTICE : %s : reconnect successful." % (connstr) )
break
except Exception,e:
errlog("CONNECT ERROR : %s : %s " % (connstr, traceback.format_exc()) )
errcnt = errcnt + 1
time.sleep(g_connect_timeout)
try:
return conn
except:
return None
#探测数据库的状态的线程
class CMonitorHost(threading.Thread):
def __init__(self, ha_groupid, hostip, isprimary, dbname, user, password):
threading.Thread.__init__(self)
self.ha_groupid = ha_groupid
self.hostip = hostip
self.isprimary = isprimary
self.dbname = dbname
self.user = user
self.password = password
def connectbackend(self):
global g_running
global g_heartbeat_interval
global g_connect_timeout
errcnt = 0
connstr = ""
while g_running:
try:
connstr = "host=%s dbname=%s user=%s password=****** connect_timeout= %d" % (self.hostip,self.dbname,self.user,g_connect_timeout)
conn = psycopg2.connect("host=%s dbname=%s user=%s password=%s connect_timeout=%s" % (self.hostip,self.dbname,self.user,self.password,g_connect_timeout))
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
if errcnt > 0:
errlog("NOTICE : %s : reconnect successful." % (connstr) )
break
except Exception,e:
errcnt = errcnt + 1
if errcnt < 3:
errlog("CONNECT ERROR : %s : %s " % (connstr, traceback.format_exc()) )
time.sleep(g_connect_timeout)
try:
return conn
except:
return None
def run(self):
global g_running
global g_heartbeat_interval
conn = self.connectbackend()
if not conn:
return
cur = conn.cursor()
while g_running :
try:
runsql = "update xdual set x=now()"
cur.execute(runsql)
except Exception,e:
errlog("RUN SQL ERROR : %s : %s " % (runsql,traceback.format_exc()))
try:
cur.close()
conn.close()
except :
pass
conn = self.connectbackend()
if not g_running:
break
if conn:
cur = conn.cursor()
else:
break
time.sleep(g_heartbeat_interval)
continue
self.update_cl_state()
time.sleep(g_heartbeat_interval)
try:
cur.close()
conn.close()
except:
pass
def update_cl_state(self):
global g_hauser
global g_hapass
global g_proxydb
try:
conn = connect_proxydb()
cur = conn.cursor()
if self.isprimary:
runsql = "update ha_config set primary_hearttime=now() where ha_groupid = %d" % (self.ha_groupid)
cur.execute("update ha_config set primary_hearttime=now() where ha_groupid = %s", (self.ha_groupid,))
else:
runsql = "update ha_config set standby_hearttime=now() where ha_groupid = %s" % (self.ha_groupid)
cur.execute("update ha_config set standby_hearttime=now() where ha_groupid = %s", (self.ha_groupid,))
cur.close()
conn.close()
except Exception,e:
errlog("RUN SQL ERROR : %s : %s" % (runsql,traceback.format_exc()))
signal.signal(signal.SIGINT, myhandle)
signal.signal(signal.SIGTERM, myhandle)
try:
conn = connect_proxydb()
except Exception,e:
errlog("CONNECT ERROR : %s " % (traceback.format_exc()) )
g_running = True
time.sleep(1)
sys.exit()
cur = conn.cursor()
try:
runsql = "select ha_groupid,primary_ip from ha_config"
cur.execute(runsql)
res = cur.fetchone()
while res :
t = CMonitorHost(res[0], res[1], True, "db01", g_hauser, g_hapass)
t.start()
res = cur.fetchone()
runsql = "select ha_groupid,standby_ip from ha_config"
cur.execute(runsql)
res = cur.fetchone()
while res :
t = CMonitorHost(res[0], res[1], False, "db01", g_hauser, g_hapass)
t.start()
res = cur.fetchone()
except Exception,e:
errlog(e.pgcode+" : RUN SQL ERROR : "+runsql+" : "+ e.pgerror)
g_running = True
time.sleep(1)
sys.exit()
time.sleep(2)
# 检查是否要切换
while g_running:
try:
runsql = "SELECT ha_groupid, extract(epoch from (now() - standby_hearttime)) shtime from ha_config where extract(epoch from (now() - primary_hearttime)) > %d and current_hostid=primary_hostid" % (g_switch_timedelay)
cur.execute(runsql)
res = cur.fetchone()
while res :
if res[1] < g_switch_timedelay:
errlog("ERROR: ha_groupid(%d) switch to standby database!" % (res[0]))
cur2 = conn.cursor()
runsql = "update ha_config set current_hostid=standby_hostid where ha_groupid=%s" % (res[0])
cur2.execute("update ha_config set current_hostid=standby_hostid where ha_groupid=%s", (res[0],))
cur2.close()
else:
errlog("ERROR: ha_groupid(%d) primary and standby all failed!!!" % (res[0]))
res = cur.fetchone()
runsql = "SELECT ha_groupid, extract(epoch from (now() - primary_hearttime)) shtime from ha_config where extract(epoch from (now() - standby_hearttime)) > %d and current_hostid=standby_hostid" % (g_switch_timedelay)
cur.execute(runsql)
res = cur.fetchone()
while res :
if res[1] < g_switch_timedelay:
errlog("ERROR: ha_groupid: %d switch to primary database!" % (res[0]))
cur2 = conn.cursor()
runsql = "update ha_config set current_hostid=primary_hostid where ha_groupid=%s" % (res[0])
cur2.execute("update ha_config set current_hostid=primary_hostid where ha_groupid=%s", (res[0],))
cur2.close()
else:
errlog("ERROR: ha_groupid: %d primary and standby all failed!!!" % (res[0]))
res = cur.fetchone()
except Exception,e:
errlog("RUN SQL ERROR : %s : %s" % (runsql,traceback.format_exc()))
try:
cur.close()
conn.close()
except :
pass
conn = connect_proxydb()
cur = conn.cursor()
time.sleep(g_heartbeat_interval)
cur.close()
conn.close()
这本文之中,没有讲解HA组中主备库之间的数据如何同步,实践中主备库之间的数据同步可以使用bucardo做数据双向同步, 也可以使用slony来同步,但由于slony不支持双master架构,备库是不能写的,所以使用slony时,还需要修改我写的这个plpha.py脚本,让其可以把slony的备库提升成主库。当然还有一些其它的方案。
另,虽然有两台plproxy,但当一台plproxy出现问题时,应用如何切换到另一台plproxy,也需要考虑,在我们公司是使用F5负载均衡器来实现的。 |
|