免费注册 查看新帖 |

ChinaUnix.net

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 5003 | 回复: 2

在PostgreSQL+plproxy中通过修改数据路由的方法实现的高可用方案 [复制链接]

论坛徽章:
3
数据库技术版块每日发帖之星
日期:2015-06-18 22:20:00数据库技术版块每日发帖之星
日期:2015-06-21 22:20:00数据库技术版块每日发帖之星
日期:2015-08-27 06:20:00
发表于 2011-11-20 17:10 |显示全部楼层
本帖最后由 osdba 于 2011-11-20 17:17 编辑

此文章也可见我的blog: http://blog.osdba.net/?post=72

    我们知道plproxy本身并不提供高可用方案。需要使用第三方的软件来实现高可用方案。这里提供了一种通过写一个简单的python脚本的来探测后面的数据库的状态,如果发现后面的数据库坏了,就让其修改数据路由,将其切换到备数据库的方法。这里提供的方案数据并不做水平拆分,而是做垂直拆分的方案。也就是按业务拆分数据,A业务的数据放在第一台数据库上,B业务的数据放到第二台数据库上。当然,如果想做数据水平拆分,只要修改一下数据路由的hash函数,也是很容易做到的。

机器说明:

下面两台机器做plproxy代理库

192.168.10.31: proxydb,安装plproxy,做函数代理

192.168.10.32: proxydb,安装plproxy,做函数代理

下面4台机器放真实的业务数据
192.168.10.33: db01

192.168.10.34: db01

192.168.10.35: db01

192.168.10.36: db01


按plproxy的标准方法,在plproxy代理库,建plproxy的3个标准函数:

create or replace function plproxy.get_cluster_version(cluster_name text)

returns integer as $$

begin

    if cluster_name = 'cluster01' then

         return 1;

    end if;

    raise exception 'no such cluster: %', cluster_name;

end;

$$ language plpgsql;



create or replace function plproxy.get_cluster_config(cluster_name text, out key text, out val text)

returns setof record as $$

begin

    key := 'statement_timeout';

    val := 60;

    return next;

    return;

end; $$ language plpgsql;


CREATE OR REPLACE FUNCTION plproxy.get_cluster_partitions(cluster_name text)

RETURNS SETOF text AS $$

BEGIN

    IF cluster_name = 'cluster01' THEN

        RETURN NEXT 'dbname=db01 host=192.168.10.33 user=buser password=buser';

        RETURN NEXT 'dbname=db01 host=192.168.10.34 user=buser password=buser';

        RETURN NEXT 'dbname=db01 host=192.168.10.35 user=buser password=buser';

        RETURN NEXT 'dbname=db01 host=192.168.10.36 user=buser password=buser';

        RETURN;

    END IF;

    RAISE EXCEPTION 'Unknown cluster';

END;

$$ LANGUAGE plpgsql;

在这个集群中,有4台机器:

192.168.10.33 192.168.10.34 :  放业务A的数据,192.168.10.33为主库,192.168.10.34为备库

192.168.10.35 192.168.10.36:放业务B的数据,192.168.10.35为主库,192.168.10.36为备库。

在proxydb代理库上建主备库的关系表:

create table ha_config(

ha_groupid int primary key, --每一对主备库,分配一个组id,

primary_ip text, --HA组中主数据库的IP

standby_ip text,--HA组中备数据库的IP

primary_hostid int,--HA组中主数据库的ID,也就是在plproxy.get_cluster_partitions函数中主机的顺序号

standby_hostid int,--HA组中备数据库的ID,

current_hostid int,--当前HA在哪台机器上

primary_hearttime timestamp, --主数据库的心跳时间,表明探测程序在这个时间探测时,数据库是好的。

standby_hearttime timestamp);--备数据库的心跳时间,表明探测程序在这个时间探测时,数据库是好的。

数据节点有两组互为主备的节点,插入两行数据:

insert into ha_config values(1,'192.168.10.33','192.168.10.34',0,1,0,now(),now());

insert into ha_config values(2,'192.168.10.35','192.168.10.36',2,3,2,now(),now());



建一个函数路由表,表明这个这个函数应该被路由到哪个HA组上的主机上:

create table function_route(

funcname text primary key,

ha_groupid int);

建hauser用户,此用户用来在proxydb上修改路由:

create user hauser password 'hauser';

grant ALL on ha_config to hauser;

grant ALL on function_route to hauser;



建可以根据ha_config表中current_hostid来计算路由的hash函数plp_route_by_funcname:

CREATE OR REPLACE FUNCTION public.plp_route_by_funcname(key text) returns int

AS $$

declare

ret int;

begin

     SELECT current_hostid INTO ret FROM function_route a,ha_config b where a.funcname=key and a.ha_groupid=b.ha_groupid;

     return ret;

end;

$$ LANGUAGE plpgsql;


在每个数据节点的数据库上:

建hauser,后面的python程序用此用户来探测这个数据库是否还活着:

create user hauser password 'hauser';

建立心跳表,hauser用户来更新这个表,如果不能更新,则说明这个数据库坏了:

create table xdual(x timestamp);

insert into xdual values(now());

grant ALL on xdual to hauser;



在proxydb上建一个测试函数:get_username,此函数根据userid得到用户名:

CREATE OR REPLACE FUNCTION public.get_username(userid int) returns text

AS $$

CLUSTER 'cluster01';

RUN ON plp_route_by_funcname('get_username');

$$ LANGUAGE plproxy;



在function_route中插入记录:

insert into function_route values('get_username',1);

这条记录是把函数get_username的调用路由到了HA组1中,也就是“192.168.10.33,192.168.10.34”的HA组中,具体是路由到192.168.10.33还是192.168.10.34,则是由ha_config表中的“current_hostid”来指定的,初使用化时,current_hostid为0,所以会路由到192.168.10.33上。当python程序(后面的内容会讲这个python程序)当检查到192.168.10.33出现问题时,会把current_hostid改为1,这时这个函数的调用就会被路由到192.168.10.34上,从而通过这种方法实现了HA。

在数据节点192.168.10.33和192.168.10.34上:

create table myuser(id int primary key, name text);

insert into myuser select generate_series(1,10000),'user'||generate_series(1,10000);

create or replace function public.get_username(userid int) returns text as $$

declare

   ret text;

begin

     SELECT name INTO ret FROM myuser where id=userid;

     return ret;

end;

$$ language plpgsql;


后面讲解如何通过python脚本来探测后面的数据库的状态,并自动进行切换的问题。
python为每一个后面的数据节点启动一个线程,每隔一段时间就更新一个数据节点上的xdual心跳表,如果更新成功,则表ha_config表中primary_hearttime字段或standby_hearttime字段更新成当前时间,这样再启动一个线程,如果发现primary_hearttime和standby_hearttime时间与当前时间对比时超过心跳时间时,就说明这个数据库出问题了,如果primary_hearttime和standby_hearttime时间与当前时间对比进都超过了心跳时间,则说明主备库出现问题了,则不切换,反之则应该切换到对端数据库上。我写的python程序如下:
#!/usr/bin/env python
# osdba 2011.11.20
# -*- coding:UTF-8

import psycopg2
import threading
import datetime
import time
import random
import signal
import sys
import traceback

g_hauser="hauser"
g_hapass="hauser"
g_proxydb="proxydb"

#g_logfile="/home/postgres/log/pg_error.log"
g_logfile="/home/postgres/plpha/myerror.log"

#心跳时间,也就是更新心跳表xdual的周期
g_heartbeat_interval = 10

#当ha_config表中primary_hearttime和standby_hearttime的值与当前时间超过下面的秒数后,就切换,此值应该要大于g_heartbeat_interval
g_switch_timedelay = 20

#连接数据库的超时时间
g_connect_timeout = 10


g_running = True




def myhandle(signum=0, e=0):
    """处理信号的函数"""
    global g_running
    print "recv sig %d" % (signum)
    g_running = False
   
def errlog(errinfo):
    global g_logfile
    f=open(g_logfile,'a')
    outinfo= time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))+" : plpcluster : "+errinfo+"\n"
    f.write(outinfo)
    print outinfo
    f.close()


def connect_proxydb():
    global g_hauser
    global g_hapass
    global g_proxydb
    global g_connect_timeout
   
    conn = psycopg2.connect("host=127.0.0.1 dbname=%s user=%s password=%s connect_timeout=%s" % (g_proxydb,g_hauser,g_hapass,g_connect_timeout) )
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    return conn

def connect_proxydb():
    global g_runningplpha.py
    global g_heartbeat_interval
    global g_connect_timeout
    errcnt = 0
    while g_running:
        try:
            connstr = "host=127.0.0.1 dbname=%s user=%s password=%s connect_timeout=%s" % (g_proxydb,g_hauser,g_hapass,g_connect_timeout)
            conn = psycopg2.connect(connstr)
            conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
            if errcnt > 0:
                errlog("NOTICE : %s : reconnect successful." % (connstr) )
            break
        except Exception,e:
            errlog("CONNECT ERROR : %s : %s " % (connstr, traceback.format_exc()) )
            errcnt = errcnt + 1
            time.sleep(g_connect_timeout)
    try:
        return conn
    except:
        return None

#探测数据库的状态的线程
class CMonitorHost(threading.Thread):
    def __init__(self, ha_groupid, hostip, isprimary, dbname, user, password):
        threading.Thread.__init__(self)
        self.ha_groupid = ha_groupid
        self.hostip = hostip
        self.isprimary = isprimary
        self.dbname = dbname
        self.user = user
        self.password =  password

    def connectbackend(self):
        global g_running
        global g_heartbeat_interval
        global g_connect_timeout
        errcnt = 0
        connstr = ""
        while g_running:
            try:
                connstr = "host=%s dbname=%s user=%s password=****** connect_timeout= %d" % (self.hostip,self.dbname,self.user,g_connect_timeout)
                conn = psycopg2.connect("host=%s dbname=%s user=%s password=%s connect_timeout=%s" % (self.hostip,self.dbname,self.user,self.password,g_connect_timeout))
                conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
                if errcnt > 0:
                    errlog("NOTICE : %s : reconnect successful." % (connstr) )
                break
            except Exception,e:
                errcnt = errcnt + 1
                if errcnt < 3:
                    errlog("CONNECT ERROR : %s : %s " % (connstr, traceback.format_exc()) )
                time.sleep(g_connect_timeout)
        try:
            return conn
        except:
            return None

    def run(self):
        global g_running
        global g_heartbeat_interval
        
        conn = self.connectbackend()
        if not conn:
            return
        cur = conn.cursor()
        
        while g_running :
            try:
                runsql = "update xdual set x=now()"
                cur.execute(runsql)
            except Exception,e:
                errlog("RUN SQL ERROR : %s : %s " % (runsql,traceback.format_exc()))
                try:
                    cur.close()
                    conn.close()
                except :
                    pass
               
                conn = self.connectbackend()
                if  not g_running:
                    break

                if conn:
                    cur = conn.cursor()
                else:
                    break
                time.sleep(g_heartbeat_interval)
                continue
            self.update_cl_state()
            time.sleep(g_heartbeat_interval)

        try:
            cur.close()
            conn.close()
        except:
            pass

    def update_cl_state(self):
        global g_hauser
        global g_hapass
        global g_proxydb
        
        try:
            conn = connect_proxydb()
            cur = conn.cursor()
            if self.isprimary:
                runsql = "update ha_config set primary_hearttime=now() where ha_groupid = %d" % (self.ha_groupid)
                cur.execute("update ha_config set primary_hearttime=now() where ha_groupid = %s", (self.ha_groupid,))
            else:
                runsql = "update ha_config set standby_hearttime=now() where ha_groupid = %s" % (self.ha_groupid)
                cur.execute("update ha_config set standby_hearttime=now() where ha_groupid = %s", (self.ha_groupid,))
            cur.close()
            conn.close()
        except Exception,e:
            errlog("RUN SQL ERROR : %s : %s" % (runsql,traceback.format_exc()))

signal.signal(signal.SIGINT, myhandle)
signal.signal(signal.SIGTERM, myhandle)

try:
    conn = connect_proxydb()
except Exception,e:
    errlog("CONNECT ERROR : %s " % (traceback.format_exc()) )
    g_running = True
    time.sleep(1)
    sys.exit()

cur = conn.cursor()
try:
    runsql = "select ha_groupid,primary_ip from ha_config"
    cur.execute(runsql)
    res = cur.fetchone()
    while res :
        t = CMonitorHost(res[0], res[1], True, "db01", g_hauser, g_hapass)
        t.start()
        res = cur.fetchone()

    runsql = "select ha_groupid,standby_ip from ha_config"
    cur.execute(runsql)
    res = cur.fetchone()
    while res :
        t = CMonitorHost(res[0], res[1], False, "db01", g_hauser, g_hapass)
        t.start()
        res = cur.fetchone()

except Exception,e:
    errlog(e.pgcode+" : RUN SQL ERROR : "+runsql+" : "+ e.pgerror)
    g_running = True
    time.sleep(1)
    sys.exit()

time.sleep(2)

# 检查是否要切换
while g_running:
    try:
        runsql = "SELECT ha_groupid, extract(epoch from (now() - standby_hearttime)) shtime from ha_config where extract(epoch from (now() - primary_hearttime)) > %d and current_hostid=primary_hostid" % (g_switch_timedelay)
        cur.execute(runsql)
        res = cur.fetchone()
        while res :
            if res[1] < g_switch_timedelay:
                errlog("ERROR: ha_groupid(%d) switch to standby database!" % (res[0]))
                cur2 = conn.cursor()
                runsql = "update ha_config set current_hostid=standby_hostid where ha_groupid=%s" % (res[0])
                cur2.execute("update ha_config set current_hostid=standby_hostid where ha_groupid=%s", (res[0],))
                cur2.close()
            else:
                errlog("ERROR: ha_groupid(%d) primary and standby all failed!!!" % (res[0]))
            res = cur.fetchone()

        runsql = "SELECT ha_groupid, extract(epoch from (now() - primary_hearttime)) shtime from ha_config where extract(epoch from (now() - standby_hearttime)) > %d and current_hostid=standby_hostid" % (g_switch_timedelay)
        cur.execute(runsql)
        res = cur.fetchone()
        while res :
            if res[1] < g_switch_timedelay:
                errlog("ERROR: ha_groupid: %d switch to primary database!" % (res[0]))
                cur2 = conn.cursor()
                runsql = "update ha_config set current_hostid=primary_hostid where ha_groupid=%s" % (res[0])
                cur2.execute("update ha_config set current_hostid=primary_hostid where ha_groupid=%s", (res[0],))
                cur2.close()
            else:
                errlog("ERROR: ha_groupid: %d primary and standby all failed!!!" % (res[0]))
            res = cur.fetchone()
            
    except Exception,e:
        errlog("RUN SQL ERROR : %s : %s" % (runsql,traceback.format_exc()))
        try:
            cur.close()
            conn.close()
        except :
            pass
        
        conn = connect_proxydb()
        cur = conn.cursor()

    time.sleep(g_heartbeat_interval)

cur.close()
conn.close()

这本文之中,没有讲解HA组中主备库之间的数据如何同步,实践中主备库之间的数据同步可以使用bucardo做数据双向同步, 也可以使用slony来同步,但由于slony不支持双master架构,备库是不能写的,所以使用slony时,还需要修改我写的这个plpha.py脚本,让其可以把slony的备库提升成主库。当然还有一些其它的方案。

另,虽然有两台plproxy,但当一台plproxy出现问题时,应用如何切换到另一台plproxy,也需要考虑,在我们公司是使用F5负载均衡器来实现的。

论坛徽章:
0
发表于 2011-11-22 12:32 |显示全部楼层
收藏一份

论坛徽章:
53
2015七夕节徽章
日期:2015-08-24 11:17:25ChinaUnix专家徽章
日期:2015-07-20 09:19:30每周论坛发贴之星
日期:2015-07-20 09:19:42ChinaUnix元老
日期:2015-07-20 11:04:38荣誉版主
日期:2015-07-20 11:05:19巳蛇
日期:2015-07-20 11:05:26CU十二周年纪念徽章
日期:2015-07-20 11:05:27IT运维版块每日发帖之星
日期:2015-07-20 11:05:34操作系统版块每日发帖之星
日期:2015-07-20 11:05:36程序设计版块每日发帖之星
日期:2015-07-20 11:05:40数据库技术版块每日发帖之星
日期:2015-07-20 11:05:432015年辞旧岁徽章
日期:2015-07-20 11:05:44
发表于 2011-12-21 21:45 |显示全部楼层
收藏了。没有做过双机
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

基于案例的 SQL 优化实战训练营

讲师:中电福富特级专家梁敬彬,参与本次课程培训,你将收获:
1. 能编写出较为高效的 SQL;
2. 能解决70%以上的数据库常见优化问题;
3. 能得到老师提供的高效的相关工具和解决方案;
4. 能举一反三,收获不仅仅是 SQL 优化。
现在购票享受8.8折优惠!
----------------------------------------
优惠时间:2019年3月20日前

大会官网>>
  

北京盛拓优讯信息技术有限公司. 版权所有 16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122
中国互联网协会会员  联系我们:huangweiwei@it168.com
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP