- 论坛徽章:
- 0
|
本帖最后由 xiaoqiangnk 于 2010-03-07 15:31 编辑
RCU框架分析
Read-Copy-Update的基本思想是在多读者少写者的情况下,读者可以无负担读取数据,读者与读者可以并行,读者与写者可以并行,从而提高系统的实时性。写者首先复制旧的数据,然后修改,然后用新数据替换旧的数据,最后再删除旧的数据。
对于一个受RCU保护的资源,能够删除旧数据的状态称为静止状态(quiecstate);等待读者全部退出的时间段称为grace period。对于一个线程释放所有持有的受RCU保护的资源时,称线程进入静止状态。
Linux2.6 RCU实现
1 主要数据结构
1. rcu_ctrlblk 全局一个变量
struct rcu_ctrlblk {
long cur; /* Current batch number. */
long completed; /* Number of the last completed batch */
int next_pending; /* Is the next batch already waiting? */
int signaled;
spinlock_t lock ____cacheline_internodealigned_in_smp;
cpumask_t cpumask; /* CPUs that need to switch in order */
/* for current batch to proceed. */
}
rcu_data 每个cpu一个变量
struct rcu_data {
/* 1) quiescent state handling : */
long quiescbatch; /* Batch # for grace period */
int passed_quiesc; /* User-mode/idle loop etc. */
int qs_pending; /* core waits for quiesc state */
/* 2) batch handling */
long batch; /* Batch # for current RCU batch */
struct rcu_head *nxtlist;
struct rcu_head **nxttail;
long qlen; /* # of queued callbacks */
struct rcu_head *curlist;
struct rcu_head **curtail;
struct rcu_head *donelist;
struct rcu_head **donetail;
long blimit; /* Upper limit on a processed batch */
int cpu;
struct rcu_head barrier;
};
rcu_head 写者一次操作一个变量
struct rcu_head {
struct rcu_head *next;
void (*func)(struct rcu_head *head);
};
rcu_synchronize 用于等待在临界区内的所有读者退出
struct rcu_synchronize {
struct rcu_head head;
struct completion completion;
};
2 初始化
1 __rcu_init
初始化每个cpu变量rcu_data,初始函数为rcu_init_percpu_data
注册RCU_SOFTIRQ软中断,其处理函数为rcu_process_callbacks
2. Grace period计数
1. rcu_check_callbacks idle 时钟中断
Linux中一次上下文切换作为一个静止状态,cpu在用户态时被中断,或者在idle时被中断并且中断不是嵌套的,并且不在软中断中。
2. schedule
3 清理
在RCU_SOFTIRQ软中断中处理。实际执行函数为__rcu_process_callbacks,该函数操作如下:
1. 如果本cpu的curlist非空且本cpu的batch#不在上次完成的batch#之后,则将curlist移到donelist,以处理这些callbacks
2. 如果nxlist非空且curlist空,
a) 将nxlist移到curlist。本cpu的batch号设置为cur+1
b) 如果next_pending没有设置,则将next_pending置位,并开始新的batch
3. Check quiescstate
4. 如果donelist非空,则处理callbacks
Linux2.6 RCU源码分析
next_pending 下一个batch是否已经等待,将nxlist移到curlist之后,设置本cpu的batch号为当前cur的下一个,并且置位next_pending。当所有cpu已经经过静止状态的batch处理完并且至少其中一个cpu有batch需要处理, rcu_start_batch将next_pending设为0,并且cur加1.
Rcuclassic.c
force_quiescent_state请求所有cpu调度,实现一次上下文切换,从而进入下一个静止状态。
static void force_quiescent_state(struct rcu_data *rdp,
struct rcu_ctrlblk *rcp)
{
int cpu;
cpumask_t cpumask;
set_need_resched(); //设置本cpu当前线程调度标志
if (unlikely(!rcp->signaled)) {
rcp->signaled = 1;
cpus_and(cpumask, rcp->cpumask, cpu_online_map); //
cpu_clear(rdp->cpu, cpumask); //本cpu已经设置,不需要通知
for_each_cpu_mask_nr(cpu, cpumask) //给其他online cpu发调度请求
smp_send_reschedule(cpu);
}
}
call_rcu 将一个call back函数排队,等待下一个grace period执行
void call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *rcu))
{
unsigned long flags;
struct rcu_data *rdp;
head->func = func;
head->next = NULL;
local_irq_save(flags);
rdp = &__get_cpu_var(rcu_data);
*rdp->nxttail = head; //将次callback排队nxtlist
rdp->nxttail = &head->next;
if (unlikely(++rdp->qlen > qhimark)) { //如果此cpu上的callback队列长度高于qhimark,强制进入静止状态
rdp->blimit = INT_MAX;
force_quiescent_state(rdp, &rcu_ctrlblk);
}
local_irq_restore(flags);
}
rcu_do_batch 处理已经过静止状态的callbacks
static void rcu_do_batch(struct rcu_data *rdp)
{
struct rcu_head *next, *list;
int count = 0;
//处理已经经过静止状态的callbacks
list = rdp->donelist;
while (list) {
next = list->next;
prefetch(next);
list->func(list);
list = next;
if (++count >= rdp->blimit) //如果处理的callbacks已达到这次要求的数目,停止处理
break;
}
rdp->donelist = list; //删除已处理的callbacks
local_irq_disable();
rdp->qlen -= count;
local_irq_enable();
if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
rdp->blimit = blimit;
if (!rdp->donelist)
rdp->donetail = &rdp->donelist;
else
raise_rcu_softirq();
}
rcu_start_batch 启动新的batch
static void rcu_start_batch(struct rcu_ctrlblk *rcp)
{
if (rcp->next_pending &&
rcp->completed == rcp->cur) { //
rcp->next_pending = 0;
smp_wmb();
rcp->cur++;
smp_mb();
cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
rcp->signaled = 0;
}
}
cpu_quiet 表示本cpu此次batch的callbacks已经处理完
static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
{
cpu_clear(cpu, rcp->cpumask); //没有callbacks等待本cpu处理
if (cpus_empty(rcp->cpumask)) { //如果任何cpu上没有callbacks等待处理,当前batch号几位complete batch
/* batch completed ! */
rcp->completed = rcp->cur;
rcu_start_batch(rcp);
}
}
rcu_check_quiescent_state
static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
if (rdp->quiescbatch != rcp->cur) { //如果本cpu的grace period号与全局batch号不一致,开始一个新的grace period
/* start new grace period: */
rdp->qs_pending = 1;
rdp->passed_quiesc = 0;
rdp->quiescbatch = rcp->cur;
return;
}
if (!rdp->qs_pending) //如果本cpu没有等待quiescstate
return;
if (!rdp->passed_quiesc) //如果本cpu没有经过quiescstate
return;
rdp->qs_pending = 0;
spin_lock(&rcp->lock);
if (likely(rdp->quiescbatch == rcp->cur)) //如果本cpu的quiescbatch与全局的当前batch号相同,则本cpu上这次batch处理的callbacks已处理完
cpu_quiet(rdp->cpu, rcp);
spin_unlock(&rcp->lock);
}
__rcu_process_callbacks
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
//如果curlist不空,并且本cpu的batch号不迟于已完成的batch,说明curlist上的callbacks可以处理了,则将curlist移到donelist去处理
if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
*rdp->donetail = rdp->curlist;
rdp->donetail = rdp->curtail;
rdp->curlist = NULL;
rdp->curtail = &rdp->curlist;
}
//如果curlist空且nxlist非空,将nxlist移到curlist
if (rdp->nxtlist && !rdp->curlist) {
local_irq_disable();
rdp->curlist = rdp->nxtlist;
rdp->curtail = rdp->nxttail;
rdp->nxtlist = NULL;
rdp->nxttail = &rdp->nxtlist;
local_irq_enable();
//开始下一个batch
rdp->batch = rcp->cur + 1;
smp_rmb();
if (!rcp->next_pending) { //如果下一个batch没有等待标志,
/* and start it/schedule start if it's a new batch */
spin_lock(&rcp->lock);
rcp->next_pending = 1;
rcu_start_batch(rcp);
spin_unlock(&rcp->lock);
}
}
rcu_check_quiescent_state(rcp, rdp);
if (rdp->donelist) //如果有已经经过静止状态的callbacks,则处理
rcu_do_batch(rdp);
} |
|