免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 5054 | 回复: 2
打印 上一主题 下一主题

linux2.6 RCU [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2010-03-07 15:27 |只看该作者 |倒序浏览
本帖最后由 xiaoqiangnk 于 2010-03-07 15:31 编辑

RCU框架分析
         Read-Copy-Update的基本思想是在多读者少写者的情况下,读者可以无负担读取数据,读者与读者可以并行,读者与写者可以并行,从而提高系统的实时性。写者首先复制旧的数据,然后修改,然后用新数据替换旧的数据,最后再删除旧的数据。
        对于一个受RCU保护的资源,能够删除旧数据的状态称为静止状态(quiecstate);等待读者全部退出的时间段称为grace period。对于一个线程释放所有持有的受RCU保护的资源时,称线程进入静止状态。
Linux2.6 RCU实现
1        主要数据结构
1.        rcu_ctrlblk   全局一个变量
struct rcu_ctrlblk {
        long        cur;                /* Current batch number.                      */
        long        completed;        /* Number of the last completed batch         */
        int        next_pending;        /* Is the next batch already waiting?         */

        int        signaled;

        spinlock_t        lock        ____cacheline_internodealigned_in_smp;
        cpumask_t        cpumask; /* CPUs that need to switch in order    */
                                 /* for current batch to proceed.        */
}



rcu_data     每个cpu一个变量
struct rcu_data {
        /* 1) quiescent state handling : */
        long                quiescbatch;     /* Batch # for grace period */
        int                passed_quiesc;         /* User-mode/idle loop etc. */
        int                qs_pending;         /* core waits for quiesc state */

        /* 2) batch handling */
        long                         batch;           /* Batch # for current RCU batch */
        struct rcu_head *nxtlist;
        struct rcu_head **nxttail;
        long            qlen;                   /* # of queued callbacks */
        struct rcu_head *curlist;
        struct rcu_head **curtail;
        struct rcu_head *donelist;
        struct rcu_head **donetail;
        long                blimit;                 /* Upper limit on a processed batch */
        int cpu;
        struct rcu_head barrier;
};

rcu_head     写者一次操作一个变量
struct rcu_head {
        struct rcu_head *next;
        void (*func)(struct rcu_head *head);
};

rcu_synchronize  用于等待在临界区内的所有读者退出
struct rcu_synchronize {
        struct rcu_head head;
        struct completion completion;
};
2        初始化
1        __rcu_init

初始化每个cpu变量rcu_data,初始函数为rcu_init_percpu_data
注册RCU_SOFTIRQ软中断,其处理函数为rcu_process_callbacks
2.        Grace period计数
1.        rcu_check_callbacks    idle  时钟中断
Linux中一次上下文切换作为一个静止状态,cpu在用户态时被中断,或者在idle时被中断并且中断不是嵌套的,并且不在软中断中。
2.        schedule
3        清理
在RCU_SOFTIRQ软中断中处理。实际执行函数为__rcu_process_callbacks,该函数操作如下:
1.        如果本cpu的curlist非空且本cpu的batch#不在上次完成的batch#之后,则将curlist移到donelist,以处理这些callbacks
2.        如果nxlist非空且curlist空,
a)        将nxlist移到curlist。本cpu的batch号设置为cur+1
b)        如果next_pending没有设置,则将next_pending置位,并开始新的batch
3.        Check quiescstate
4.        如果donelist非空,则处理callbacks

Linux2.6 RCU源码分析
next_pending    下一个batch是否已经等待,将nxlist移到curlist之后,设置本cpu的batch号为当前cur的下一个,并且置位next_pending。当所有cpu已经经过静止状态的batch处理完并且至少其中一个cpu有batch需要处理,   rcu_start_batch将next_pending设为0,并且cur加1.

Rcuclassic.c
force_quiescent_state请求所有cpu调度,实现一次上下文切换,从而进入下一个静止状态。
static void force_quiescent_state(struct rcu_data *rdp,
                        struct rcu_ctrlblk *rcp)
{
        int cpu;
        cpumask_t cpumask;
        set_need_resched();     //设置本cpu当前线程调度标志
        if (unlikely(!rcp->signaled)) {
                rcp->signaled = 1;
                cpus_and(cpumask, rcp->cpumask, cpu_online_map);  //
                cpu_clear(rdp->cpu, cpumask); //本cpu已经设置,不需要通知
                for_each_cpu_mask_nr(cpu, cpumask) //给其他online cpu发调度请求
                        smp_send_reschedule(cpu);
        }
}


call_rcu  将一个call back函数排队,等待下一个grace period执行
void call_rcu(struct rcu_head *head,
                                void (*func)(struct rcu_head *rcu))
{
        unsigned long flags;
        struct rcu_data *rdp;

        head->func = func;   
        head->next = NULL;
        local_irq_save(flags);
        rdp = &__get_cpu_var(rcu_data);
        *rdp->nxttail = head;    //将次callback排队nxtlist
        rdp->nxttail = &head->next;
        if (unlikely(++rdp->qlen > qhimark)) {  //如果此cpu上的callback队列长度高于qhimark,强制进入静止状态
                rdp->blimit = INT_MAX;
                force_quiescent_state(rdp, &rcu_ctrlblk);
        }
        local_irq_restore(flags);
}

rcu_do_batch     处理已经过静止状态的callbacks
static void rcu_do_batch(struct rcu_data *rdp)
{
        struct rcu_head *next, *list;
        int count = 0;
        //处理已经经过静止状态的callbacks
        list = rdp->donelist;
        while (list) {
                next = list->next;
                prefetch(next);
                list->func(list);
                list = next;
                if (++count >= rdp->blimit)         //如果处理的callbacks已达到这次要求的数目,停止处理
                        break;
        }
        rdp->donelist = list;  //删除已处理的callbacks

        local_irq_disable();
        rdp->qlen -= count;
        local_irq_enable();
        if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
                rdp->blimit = blimit;

        if (!rdp->donelist)
                rdp->donetail = &rdp->donelist;
        else
                raise_rcu_softirq();
}

rcu_start_batch   启动新的batch
static void rcu_start_batch(struct rcu_ctrlblk *rcp)
{
        if (rcp->next_pending &&
                        rcp->completed == rcp->cur) { //
                rcp->next_pending = 0;
                smp_wmb();
                rcp->cur++;

                smp_mb();
                cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);

                rcp->signaled = 0;
        }
}
cpu_quiet  表示本cpu此次batch的callbacks已经处理完
static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
{
        cpu_clear(cpu, rcp->cpumask);  //没有callbacks等待本cpu处理
        if (cpus_empty(rcp->cpumask)) {  //如果任何cpu上没有callbacks等待处理,当前batch号几位complete batch
                /* batch completed ! */
                rcp->completed = rcp->cur;
                rcu_start_batch(rcp);
        }
}

rcu_check_quiescent_state   
static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
                                        struct rcu_data *rdp)
{
        if (rdp->quiescbatch != rcp->cur) {  //如果本cpu的grace period号与全局batch号不一致,开始一个新的grace period
                /* start new grace period: */
                rdp->qs_pending = 1;
                rdp->passed_quiesc = 0;
                rdp->quiescbatch = rcp->cur;
                return;
        }

        if (!rdp->qs_pending)   //如果本cpu没有等待quiescstate
                return;

        if (!rdp->passed_quiesc) //如果本cpu没有经过quiescstate
                return;

        rdp->qs_pending = 0;

        spin_lock(&rcp->lock);
        if (likely(rdp->quiescbatch == rcp->cur))  //如果本cpu的quiescbatch与全局的当前batch号相同,则本cpu上这次batch处理的callbacks已处理完
                cpu_quiet(rdp->cpu, rcp);

        spin_unlock(&rcp->lock);
}

__rcu_process_callbacks  
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
                                        struct rcu_data *rdp)
{
        //如果curlist不空,并且本cpu的batch号不迟于已完成的batch,说明curlist上的callbacks可以处理了,则将curlist移到donelist去处理
        if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
                *rdp->donetail = rdp->curlist;
                rdp->donetail = rdp->curtail;
                rdp->curlist = NULL;
                rdp->curtail = &rdp->curlist;
        }
        //如果curlist空且nxlist非空,将nxlist移到curlist
        if (rdp->nxtlist && !rdp->curlist) {
                local_irq_disable();
                rdp->curlist = rdp->nxtlist;
                rdp->curtail = rdp->nxttail;
                rdp->nxtlist = NULL;
                rdp->nxttail = &rdp->nxtlist;
                local_irq_enable();
                //开始下一个batch
                rdp->batch = rcp->cur + 1;
                smp_rmb();

                if (!rcp->next_pending) {    //如果下一个batch没有等待标志,
                        /* and start it/schedule start if it's a new batch */
                        spin_lock(&rcp->lock);
                        rcp->next_pending = 1;
                        rcu_start_batch(rcp);
                        spin_unlock(&rcp->lock);
                }
        }

        rcu_check_quiescent_state(rcp, rdp);
        if (rdp->donelist)     //如果有已经经过静止状态的callbacks,则处理
                rcu_do_batch(rdp);
}

论坛徽章:
0
2 [报告]
发表于 2010-03-07 15:36 |只看该作者
长知识了!

论坛徽章:
0
3 [报告]
发表于 2010-03-07 16:10 |只看该作者
如何使用呢?
我看到 RCU LOCK 的文档,讲述了 n 种情况,不同的场景使用不同的方法,但实际使用中却一头雾水
楼主能给讲讲应该如何使用 rcu_lock 吗?
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP