免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 812 | 回复: 0
打印 上一主题 下一主题

Heartbeat 通信层结构分析 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2008-11-17 14:30 |只看该作者 |倒序浏览
前言
  Heartbeat是Linux-HA开源项目发布的用于关键应用环境的HA软件名称。从1999以来到现在, 历经1.2.x,
2.0.x等多个版本,在全球开源HA领域具有举足轻重的知名度, 应用日益广泛, 并且得到了一些主流Linux操作系统厂商的支持。
而通信层实现无疑是集群软件运行的最基本底层支撑。本文对通过分析Heartbeat源码,对其通信层基本结构和机制进行了分析和阐述。给出了基本数据结构和实现流程。
  所有分析基于Heartbeat 2.0.4版本。
相关源码:
http://www.linux-ha.org/download/heartbeat-2.0.4.tar.gz
Heartbeat通信结构概述
  主要分2种:
  1.HBcomm 通信层PLUGIN  (节点之间的进程通信)
实现主要是在各个媒介的Plugin里,通过PILS动态连接库加载。比如支持多播,单播,串口等通信方式。所有节点间通信PLUGIN模块放在lib/plugins/hbcomm/路径下。
  2.Unix Domain Socket  (节点内的进程通信)
  /include/clplumbing/Ipc.h,         IPC抽象层数据结构定义
  /lib/clplumbing/ocf_ipc.c,         IPC底层抽象实现
  /lib/clplumbing/ipcsocket.c,         IPC的unix域套接字具体实现
交叉点:
  节点间和节点内2种通信方式的交接点在heartbeat.c的read_child(), write_child()等函数中, 在这里实现消息的转发。
Heartbeat API
  是基于ipc抽象层的Unix域实现基础上,用于满足heartbeat和client子模块之间的应用层通信需求。:
  client_lib.c实现了Heartbeat API 客户端部分。
  hb_api.c实现了heartbeat API服务器端部分。


  
  图1:Heartbeat通信结构概图
    上图描述了一个client子模块把消息通过Heartbeat通信机制发送到另一个节点相同模块的过程。
  1.
client子模块通过FIFO管道把消息发送到FIFO子进程fifo_child。为什么使用FIFO来进行通信呢,应该是有些进程不能很方便的和
Heartbeat主进程建立Unix域IPC通道的关系,比如执行的脚本和集群管理程序, 集群状态查询程序。
  2.        FIFO子进程通过msgfromstream()从fifo管道收到消息后,利用事先建立好的和Heartbeat之间的IPC通道转发给Heartbeat主进程
  3.        主进程判断消息是发给自己的则调用process_msg()进行处理,否则调用send_to_all_media()通过各个媒介的wchan通道发送给write_child子进程。
  4.        write_child子进程通过ipcmsgfromIPC()从主进程收到消息,调用各个媒介结构hb_media的write函数把消息发送到集群其他节点。
  5.        其他节点的read_child子进程通过各个媒介结构hb_media的read函数读到消息后,使用事先和Heartbeat主进程建立的IPC通道发送消息到Heartbeat主进程
  6.
Heartbeat主进程通过msgfromIPC()收到消息后,调用process_clustermsg()函数进行处理。具体为,如果是主进程处
理的消息调用HBDoMsgCallback进行处理,否则通过newstartha_monitor发送到各个client子进程
节点间通信Plugin
  代码在lib/plugins/hbcomm/目录中
  bcast.c                        /* 广播 */
  mcast.c                        /* 多播 */
  ucast.c                        /* 单播 */
  openais.c                        /* openais */
  serial.c                        /* 串口 */
  ping.c                        /* icmp */
  ping_group.c                /* ping一组主机 */
  hbaping.c                        /* 光纤总线适配器ping */
/* 这个结构的每个函数对应Plugin里的具体函数。*/
struct hb_media_fns {
        struct hb_media*(*new)        (const char * token);                                /*  建立媒介 */
        int                        (*parse)        (const char * options);                                /*  读取配置文件参数 */
        int                        (*open)        (struct hb_media *mp);                                /*  打开 */
        int                        (*close)        (struct hb_media *mp);                                /* 关闭 */
        void*                (*read)        (struct hb_media *mp, int *len );                /* 读 */
        int                        (*write)        (struct hb_media *mp ,        void *msg, int len);                /* 写 */
        int                        (*mtype)        (char **buffer);                                        /* 获取媒介类型 */
        int                        (*descr)        (char **buffer);                                        /* 获取媒介描述 */
        int                        (*isping)        (void);                                                        /* 是否ping类型媒介 */
};
hb_media_fns各功能函数调用之处:
new():                  config.c的add_option函数
parse():          config.c的parse_config函数
open():                  heartbeat.c的initialize_heartbeat函数
close():                  heartbeat.c的initialize_heartbeat函数
read():                  heartbeat.c的read_child函数
write():                  heartbeat.c的write_child函数
mtype():          config.c的parse_config函数
descr():          config.c的parse_config函数
isping():          在config.c和hb_api.c被调用
节点内IPC通信
IPC通信抽象层(include\clplumbing\ipc.h)
IPC抽象层数据结构概述:
(注:缩进的为该数据结构所属元素)
IPC_AUTH                                                /* 安全认证数据结构 */
IPC_WAIT_CONNECTION                        /* 等待连接数据结构 */
IPC_WAIT_OPS                                /* 等待连接函数集 */
IPC_CHANNEL                                        /* 通信管道数据结构 */
IPC_OPS                                                /* 通信管道函数集 */
IPC_QUEUE                                        /* 信息队列 */
ipc_bufpool                                        /* 接收缓冲池,经处理转化为接收队列 */
IPC_MESSAGE                                        /* IPC通信信息数据结构 */
        IPC_CHANNEL                                /* 信息所属通信管道 */
SOCKET_MSG_HEAD                                /* 信息头数据结构 */
其中2种主要抽象数据结构:
/* server端等待客户端的连接 */
struct IPC_WAIT_CONNECTION{
        int                                ch_status;                        /* wait conn. status.*/
        void *                        ch_private;                /* wait conn. private data. */
        IPC_WaitOps        *ops;                        /* wait conn. function table .*/
};
/* 活动的通信管道结构 */
struct IPC_CHANNEL{
        int                        ch_status;                        /* 通道状态 */
        pid_t                farside_pid;                /* 远端 pid */
        void*                ch_private;                /* channel private data. (may contain conn. info.) */
        IPC_Ops*        ops;                                /* 通道函数集 */
        unsigned int        msgpad;                        /* 信息前缀字节数 */
        unsigned int        bytes_remaining;        /* 剩余未发送的字节数 */
        gboolean        should_send_block;                /*  */
        /* private: */
        IPC_Queue*        send_queue;                 /* 发送缓冲 */
        IPC_Queue*        recv_queue;                 /* 接收缓冲 */
        /* 接收缓冲池, 经处理后转化为接收信息队列recv_queue */
        struct ipc_bufpool* pool;                /* buffer pool */
        /* 发送的流量控制 */
        int                                high_flow_mark;
        int                                low_flow_mark;
        void*                        high_flow_userdata;
        void*                        low_flow_userdata;
        flow_callback_t        high_flow_callback;
        flow_callback_t        low_flow_callback;
       
        int                                conntype;       
        char                                failreason[MAXFAILREASON];
};
IPC抽象层通信
server端:
1.  调用ipc_wait_conn_constructor()建立等待连接管道,成功则返回IPC_WaitConnection.  
2.  通过poll/select来轮询客户请求。使用accept_connection接受连接,返回IPC_Channel。
client端:
调用ipc_channel_constructor()连接server, 返回IPC_Channel。
IPC抽象层的UNIX Domain Socket实现
static struct IPC_OPS socket_ops = {
        destroy:                        socket_destroy_channel,                   /* 删除通信管道 */
        initiate_connection:        socket_initiate_connection,                /* 从client端建立连接 */
        verify_auth:                socket_verify_auth,                                /* 客户端认证信息 */
        assert_auth:                socket_assert_auth,                                /* 断言认证, (未用)*/
        send:                        socket_send,                                        /* 向管道发送信息 */
        recv:                                socket_recv,                                        /* 从管道接收信息*/
        waitin:                        socket_waitin,                                        /* 等待输入信息, (然后读取) */
        waitout:                        socket_waitout,                                /* 等待信息输出结束 */
        is_message_pending:        socket_is_message_pending,        /* 有信息可读或挂断 */
        is_sending_blocked:        socket_is_output_pending,        /* 输出是否阻塞 */
        resume_io:                        socket_resume_io,                        /* 恢复所有可能的ipc操作 */
        get_send_select_fd:                socket_get_send_fd,                /* 取得发送fd */
        get_recv_select_fd:                socket_get_recv_fd,                /* 取得接收fd */
        set_send_qlen:                        socket_set_send_qlen,                /* 设置最大发送缓冲长度 */
        set_recv_qlen:                        socket_set_recv_qlen,                /* 设置最大接收缓冲长度*/
        set_high_flow_callback:        socket_set_high_flow_callback,   /* 高流量callback函数 */
        set_low_flow_callback:        socket_set_low_flow_callback,         /* 低流量callback函数 */
        new_ipcmsg:                socket_new_ipcmsg,                        /* 返回一个新建立的IPC信息 */
        get_chan_status:        socket_get_chan_status,                        /* 返回管道状态 */
        is_sendq_full:                socket_is_sendq_full,                        /* 发送缓冲是否已满 */
        is_recvq_full:                socket_is_recvq_full,                        /* 接收缓冲是否已满 */
        get_conntype:                socket_get_conntype,                        /* 返回管道类型 */
                                        /* 可以是IPC_SERVER , IPC_CLIENT , IPC_PEER */
};
节点间通信Plugin / 节点内通信交叉点
主要实现代码在heartbeart.c中
Heartbeat通信媒介结构
struct hb_media {
        void *                pd;                                /* 自定义数据结构 */
        const char *        name;                        /* 媒介名 */
        char*                type;                                /* 媒介类型 */
        char*                description;                /* 媒介描述 */
        const struct hb_media_fns*vf;        /* hbcomm媒介处理函数集 */
        IPC_Channel*        wchan[2];                /* Unix域写子进程通信管道 */
        IPC_Channel*        rchan[2];                        /* Unix域读子进程通信管道 */
};
/* heartbeat发送信息集群 */
/* 1.发送消息到write_child子进程 */
send_cluster_msg{                /* 发送信息到集群 */
        …
        process_outbound_packet{                  /* 带包重传控制 */
                send_to_all_media{                /* 发送到所有媒介 */
                        for (j=0; j wchan[P_WRITEFD];
                                …
                                /* 发送到特定传送媒介的写子进程 */
                                wrc=wch->ops->send(wch, outmsg);
                        }
                }
        }
}
/* 2. write_child写子进程发送消息到集群 */
write_child(){
        IPC_Channel* ourchan =        mp->wchan[P_READFD];
        for(;;){
                /* write_child通过Unix Domain Socket 接收heartbeat信息 */
                IPC_Message*        ipcmsg = ipcmsgfromIPC(ourchan);  /* 调用ops->recv() */
                …
                /* 发送到集群其他节点 */
                if (mp->vf->write(mp, ipcmsg->msg_body, ipcmsg->msg_len) != HA_OK) {
……
                }
        }
}
/* 从集群接收信息 */
/* 1. read_child读子进程从集群接收消息 */
Read_child(){
        IPC_Channel* ourchan =        mp->rchan[P_READFD];
        For(;;){
                /* 从hbcomm PLUGIN接收 */
                if ((pkt=mp->vf->read(mp, &pktlen)) == NULL) {
                ……
                }
                if (NULL != imsg){
                        /* read_child子进程通过UNIX Domain Socket, 发送到heartbeat */
                        rc = ourchan->ops->send(ourchan, imsg);
                        rc2 = ourchan->ops->waitout(ourchan);
                        …
                }
        }
}
/* 2. heartbeat从read_child子进程接收信息并进行处理 */
s = G_main_add_IPC_Channel(PRI_READPKT
                ,        sysmedia[j]->rchan[P_WRITEFD], FALSE
                ,        read_child_dispatch, sysmedia+j, NULL);
read_child_dispatch(){
        …
        msg = msgfromIPC(source, MSG_NEEDAUTH);        /*调用ops->recv()从read_child读 */
        process_clustermsg(msg, lnk);                                        /* 对读到信息进行处理 */
}
heartbeat API Server端
struct api_query_handler query_handler_list[] = {
    {API_SIGNOFF, api_signoff},                                        /* client登陆 */
    {API_SETFILTER, api_setfilter},                                        /* 设置消息过滤 */
    {API_SETSIGNAL, api_setsignal},                                /* 设置消息到达信号通知 */
    {API_NODELIST, api_nodelist},                                        /* 获取节点列表 */
    {API_NODESTATUS, api_nodestatus},                        /* 查询节点状态 */
    {API_NODETYPE, api_nodetype},                                /* 查询节点类型 */
    {API_IFSTATUS, api_ifstatus},                                        /* 查询心跳状态 */
    {API_IFLIST, api_iflist},                                                /* 查询心跳列表 */
    {API_CLIENTSTATUS, api_clientstatus},                        /* 查询client模块状态 */
    {API_NUMNODES, api_num_nodes},                                /* 返回集群普通节点数 */
    {API_GETPARM, api_get_parameter},                                /* 返回特定参数值 */
    {API_GETRESOURCES, api_get_resources},                /* 返回资源状态(兼容1.2.x以前版本) */
    {API_GETUUID, api_get_uuid},                                        /* 取得节点uuid值 */
    {API_GETNAME, api_get_nodename},                        /* 取得节点名 */
    {API_SET_SENDQLEN, api_set_sendqlen}                /* 设置发送队列长度 */
};
heartbeat API client端
static struct llc_ops heartbeat_ops = {
        signon:                                hb_api_signon,                /* 注册新的heartbeat client */
        signoff:                                hb_api_signoff,                /* 注销一个heartbeat client */
        delete:                                hb_api_delete,                        /* 注销结构 */
        set_msg_callback:                set_msg_callback,                /* 设置某信息类型callback */
        set_nstatus_callback:        set_nstatus_callback,        /* 设置节点状态类型callback */
        set_ifstatus_callback:        set_ifstatus_callback,        /* 设置心跳状态类型callback */
        set_cstatus_callback:        set_cstatus_callback,        /* 设置client状态类型callback */
        init_nodewalk:                        init_nodewalk,                        /* 初始化节点遍历 */
        nextnode:                                nextnode,                                /* 下一个节点 */
        end_nodewalk:                end_nodewalk,                /* 结束节点遍历 */
        node_status:                        get_nodestatus,                /* 节点当前状态 */
        node_type:                        get_nodetype,                        /* 节点类型 */
        init_ifwalk:                        init_ifwalk,                        /* 初始化心跳遍历 */
        nextif:                                nextif,                                /* 下一个心跳接口 */
        end_ifwalk:                        end_ifwalk,                        /* 结束心跳遍历 */
        if_status:                                get_ifstatus,                        /* 心跳当前状态 */
        client_status:                        get_clientstatus,                /* client当前状态 */
        get_uuid_by_name:                get_uuid_by_name,                /* 根据名字取得uuid */
        get_name_by_uuid:                get_name_by_uuid,                /* 根据uuid取得名字 */
        sendclustermsg:                sendclustermsg,                /* 发送消息到cluster中所有成员*/
        sendnodemsg:                        sendnodemsg,                        /* 发送消息到特定节点 */
        sendnodemsg_byuuid:        sendnodemsg_byuuid,        /* 发送消息到特定节点(by uuid)*/
        send_ordered_clustermsg:send_ordered_clustermsg, /* 发送顺序集群信息 */
        send_ordered_nodemsg:        send_ordered_nodemsg,        /* 发送顺序节点信息 */
        inputfd:                                get_inputfd,                        /* 返回和检测信息到达*/
        ipcchan:                                get_ipcchan,                        /* 返回IPC_Channel 类型ipc通道 */
        msgready:                        msgready,                        /* 当有信息可读时返回true*/
        setmsgsignal:                        hb_api_setsignal,                /* setmsgsignal */
        rcvmsg:                                rcvmsg,                                /* 接收msg, 交给callback处理 */
        readmsg:                                read_msg_w_callbacks,        /* 返回没有注册callback的msg */
        setfmode:                                setfmode,                                /* setfmode */
        get_parameter:                        get_parameter,               
        get_deadtime:                        get_deadtime,               
        get_keepalive:                        get_keepalive,               
        get_mynodeid:                        get_mynodeid,                        /* 取得本地节点名 */
        get_logfacility:                get_logfacility,                /* suggested logging facility */
        get_resources:                        get_resources,                        /* 取得资源当前分布状态 */
        chan_is_connected:                chan_is_connected,
        set_sendq_len:                        set_sendq_len,                        /* 设置发送缓存区长度 */
        set_send_block_mode:        socket_set_send_block_mode,
        errmsg:                                APIError,               
};
注:
Client端API函数集明显比Server端查询处理函数集要多,是因为有些功能不需要通过Server端查询来得到。
               
               
               

本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u2/76419/showart_1420297.html
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP