免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 2450 | 回复: 3
打印 上一主题 下一主题

lkd2中读-写信号量问题请教,高手们进 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2011-11-17 14:34 |只看该作者 |倒序浏览
本帖最后由 snowboy9859 于 2011-11-17 15:05 编辑

lkd2中有一段描述很含糊:
    所有的读-写信号量都是互斥信号量(也就是说,他们的引用计数等于1)。只要没有写者,并发持有读锁的读者数不限。相反,只有唯一的写者(在没有读者时)可以获得读锁。

    引用计数等于1,那么只要没有写者,并发持有读锁的读者数不限是怎么实现的,内核是怎么做到的,请指教,多谢。
计数为1,如果每次获取读锁都计数减1的话,怎么能够保证读者数不限

论坛徽章:
0
2 [报告]
发表于 2011-11-17 14:50 |只看该作者
check_flags(flags);?是否每次获取信号量时都检查这个标记,来查看当前是读锁还是写锁?

论坛徽章:
0
3 [报告]
发表于 2011-11-17 17:31 |只看该作者
我以前总结的,参考一下读写自旋锁。

自旋锁
普通自旋锁
一般说的自旋锁就是特指普通自旋锁。
自旋锁实现比较简单,每一把锁都有一个核心属性slock表示锁的状态,根据这个属性的低两个字节是否相同表示是否锁住,相同的话表示未锁,不同的话表示锁住。注意:ulk3说是1表示未锁,根据我看内核代码,最近的内核(31)已经改掉了,下面会具体分析。
已经取到自旋锁的内核路径自动的禁止了内核抢占。自旋锁保护的代码应该少并且很快的执行,当然不能允许被调度出cpu的。
没有取到锁的内核路径cpu会忙等,也会禁止内核抢占。

每一把自旋锁都有spinlock_t的数据结构。
typedef struct {
        raw_spinlock_t raw_lock;
        unsigned int break_lock;
    。。。
} spinlock_t;

把自旋锁的slock设置为0未锁状态(低两个字节当然相同了)。
# define spin_lock_init(lock)                                        \
do {                                                                \
        static struct lock_class_key __key;                        \  //key是debug用的
        __spin_lock_init((lock), #lock, &__key);                \
} while (0)

循环尝试取锁,一直到获取到自旋锁。
#define spin_lock(lock)                        _spin_lock(lock)

释放锁,将slock置为0.
# define spin_unlock(lock) \
do {__raw_spin_unlock(&(lock)->raw_lock); __release(lock); } while (0)

等待,直到自旋锁变成未锁状态。
#define spin_unlock_wait(lock)        __raw_spin_unlock_wait(&(lock)->raw_lock)

自旋锁是锁住的吗?锁住的话返回1,未锁的话返回0
#define spin_is_locked(lock)        __raw_spin_is_locked(&(lock)->raw_lock)

尝试取锁,如果取到锁了返回1,如果没有取到锁返回0。
#define spin_trylock(lock)                __cond_lock(lock, _spin_trylock(lock))

Spin_lock分析
#define spin_lock(lock)                        _spin_lock(lock)

void __lockfunc _spin_lock(spinlock_t *lock)
{
        preempt_disable(); //首先要禁止抢占了。刚才说了
    //下面两句都和debug有关,现在只关心核心的_raw_spin_lock
        spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
        LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock);
}

static __always_inline void __raw_spin_lock(raw_spinlock_t *lock)
{
        __ticket_spin_lock(lock);
}

static __always_inline void __ticket_spin_lock(raw_spinlock_t *lock)
{
        short inc = 0x0100;
    //假设lock当前没有被锁,那么就是*lock=0x0
        asm volatile (
        //交换x,y并且y=x+y。于是inc=0x0,lock=0x0100(非0已锁)
                LOCK_PREFIX "xaddw %w0, %1\n"
                "1:\t"
        //比较inc的低字节和高字节,现在当然相等了。Inc=0x0。
                "cmpb %h0, %b0\n\t"
                "je 2f\n\t"  //相等得话,就是取到锁了,返回了啊。(如果已经被锁了,inc可能等于0x0100,就不相等了)。
                "rep ; nop\n\t" //没有取到锁啊
                "movb %1, %b0\n\t" //看看有没有解锁呢
                /* don't need lfence here, because loads are in-order */
                "jmp 1b\n"  //循环
                "2:"
                : "+Q" (inc), "+m" (lock->slock)
                :
                : "memory", "cc");
}
什么是解锁:
static __always_inline void __ticket_spin_unlock(raw_spinlock_t *lock)
{
        asm volatile(UNLOCK_LOCK_PREFIX "incb %0" //就是将lock最低字节加1
                     : "+m" (lock->slock)
                     :
                     : "memory", "cc");
}
思考一下加锁解锁的情况:
初始:lock = 0x0000
加锁后:0x0100
解锁后:0x0101
加锁后:0x0201
解锁后:0x0202
加锁后:0x0302
解锁后:0x0303
。。。。。如此下去

是的,很明显下面一些api是为了应对内核的各种同步需求,将禁止中断、禁止可延迟同自旋锁的功能封装在一起简化了代码编写。
#define spin_lock_irq(lock)                _spin_lock_irq(lock)
# define spin_unlock_irq(lock)                _spin_unlock_irq(lock)

#define spin_lock_bh(lock)                _spin_lock_bh(lock)
#define write_unlock_bh(lock)                _write_unlock_bh(lock)

#define spin_lock_irqsave(lock, flags)                。。。
#define spin_unlock_irqrestore(lock, flags)                。。。
        例子
__do_IRQ()
  Spin_lock(&(irq_desc[irq].lock)); //获取自旋锁防止多cpu同时访问irq_desc
  Irq_desc[irq].handler->ack(irq); //发送ack通知pic以便pic可以进一步产生中断,同时禁用该IRQ线。(但是此时中断时禁止的,中断门在硬件处理时已经禁止了中断)
  Irq_desc[irq].status &= ~(IRQ_REPLAY  |  IRQ_WAITING); //清除这两个标志
  Irq_desc[irq].status |= IRQ_PENDING; //irq线已经pending,但是还没有处理
  If (!(irq_desc[irq].status & (IRQ_DISABLED | IRQ_INPROGRESS)
     && irq_desc[irq].action) { //如果irq线没有被驱动禁止也没有正在执行。
Irq_desc[irq].status |= IRQ_INPROGRESS;//irq相应的ISR正在执行。
do {
irq_desc[irq].status &= ~IRQ_PENDING;//取消pending标志,在handle_IRQ_event中其他cpu可能会再次设置该IRQ线的pending标志。
Spin_unlock(&irq_desc[irq].lock);
Handle_IRQ_event(irq, regs, irq_desc[irq].action); //调用中断服务例程,这个时候其他cpu可以产生同样的中断,并且有了spinlock,所以又会设置IRQ_PENDING位。
Spin_lock(&irq_desc[irq].lock);
} while (irq_desc[irq].statu & IRQ_PENDING)  //如果有pending调用handle_IRQ_event
Irq_desc[irq].status &= ~IRQ_INPROGRESS;//清除irq正在执行标志。
}
  Irq_desc[irq].handler->end(irq); //发送end通知pic,以便pic重新开启本地的该IRQ线中断
  Spin_lock(&irq_desc[irq].lock);  

读写自旋锁
如果共享数据读的次数明显高于写的次数话,使用读写自旋锁是比自选锁更好的选择。假设没有写锁的情况下,多个读锁可以同时获得,增加了并发性。当然读锁和写锁仍然是互斥的,有了写锁就没法获取读锁,同样有了读锁也没法获取写锁。

        原理:
每个读写自旋锁都维护一个lock变量。初始化为0x01000000。表示未锁状态。
获取读锁的时候,将lock-1,之后如果lock仍然>0表示获取读锁成功。如果小于0(例如-1),表示有写锁正在使用,获取读锁失败的话将lock+1,然后进入循环不断检测是否写锁已经释放(lock>0),然后跟开始一样重新lock-1尝试获取读锁。
释放读锁就是将lock+1。
获取写锁的时候,将lock-0x01000000,之后如果lock为0表示获取写锁成功,就是说只有未锁的状态下才能获取到写锁。如果获取写锁失败,lock<0,同样的lock+0x01000000,然后检测进入循环检测是否所有读锁已经释放(lock==0x01000000),然后跟开始一样重新lock-0x01000000尝试获取写锁。
释放写锁就是lock+0x01000000
小结一下lock意义,lock等于0x00000000表示有写锁状态。Lock等于 0x00000001~0x00fffff表示有一个或多个读锁状态。尝试获取锁的失败时Lock<0。

        API
每个读写自旋锁都有一个rwlock_t的结构。
typedef struct {
        raw_rwlock_t raw_lock;
        unsigned int break_lock;
} rwlock_t;

初始化为未锁状态
# define rwlock_init(lock)                                        \

获取、释放写锁
#define write_lock(lock)                _write_lock(lock)
#define write_lock_bh(lock)                _write_lock_bh(lock)
#define write_lock_irq(lock)                _write_lock_irq(lock)
#define write_lock_irqsave(lock, flags)                        \

# define write_unlock(lock)                _write_unlock(lock)
#define write_unlock_bh(lock)                _write_unlock_bh(lock)
# define write_unlock_irq(lock)                        \
#define spin_unlock_irqrestore(lock, flags)                \

获取、释放读锁
#define read_lock(lock)                        _read_lock(lock)
#define read_lock_bh(lock)                _read_lock_bh(lock)
#define write_lock_irq(lock)                _write_lock_irq(lock)
#define read_lock_irqsave(lock, flags)                        \

# define read_unlock(lock)                _read_unlock(lock)
#define read_unlock_bh(lock)                _read_unlock_bh(lock)
# define read_unlock_irq(lock)                _read_unlock_irq(lock)
#define read_unlock_irqrestore(lock, flags)                \

        例子
/**
* sys_sched_getscheduler - get the policy (scheduling class) of a thread
* @pid: the pid in question.
*/
SYSCALL_DEFINE1(sched_getscheduler, pid_t, pid)
{
        struct task_struct *p;
        int retval;

        if (pid < 0)
                return -EINVAL;

        retval = -ESRCH;
        read_lock(&tasklist_lock);
        p = find_process_by_pid(pid);
        if (p) {
                retval = security_task_getscheduler(p);
                if (!retval)
                        retval = p->policy;
        }
        read_unlock(&tasklist_lock);
        return retval;
}


/*
* This needs some heavy checking ...
* I just haven't the stomach for it. I also don't fully
* understand sessions/pgrp etc. Let somebody who does explain it.
*
* OK, I think I have the protection semantics right.... this is really
* only important on a multi-user system anyway, to make sure one user
* can't send a signal to a process owned by another.  -TYT, 12/12/91
*
* Auch. Had to add the 'did_exec' flag to conform completely to POSIX.
* LBT 04.03.94
*/
SYSCALL_DEFINE2(setpgid, pid_t, pid, pid_t, pgid)
{
        struct task_struct *p;
        struct task_struct *group_leader = current->group_leader;
        struct pid *pgrp;
        int err;

        if (!pid)
                pid = task_pid_vnr(group_leader);
        if (!pgid)
                pgid = pid;
        if (pgid < 0)
                return -EINVAL;

        /* From this point forward we keep holding onto the tasklist lock
         * so that our parent does not change from under us. -DaveM
         */
        write_lock_irq(&tasklist_lock);
。。。
顺序锁
顺序锁与读写自旋锁非常相似,只是它为写者赋予了较高的优先级,实际上,即使在读者正在读者正在读的时候也允许写者继续运行,这种策略的好处就是让写者永不会等待(除非有另外一个写者正在写),缺点就是有些时候读者不得不反复多次读相同的数据直到它获得有效的副本。

        原理
每个顺序锁都包含两个字段的seqlock_t结构,一个spinlock_t的lock,另外一个是整形的sequence。
写者,取lock锁,并且增加sequence。
读者并不取lock锁,但是需要反复读取sequence直到读到有效的数据。

        API
每个顺序锁都有一个seqlock_t结构。
typedef struct {
        unsigned sequence;
        spinlock_t lock;
} seqlock_t;

初始化顺序锁,lock未锁状态,sequence设置为0.
#define seqlock_init(x)                                        \
        do {                                                \
                (x)->sequence = 0;                        \
                spin_lock_init(&(x)->lock);                \
        } while (0)

循环,获取写顺序锁。
static inline void write_seqlock(seqlock_t *sl)
{
        spin_lock(&sl->lock);//循环获取自旋锁
        ++sl->sequence;
        smp_wmb();
}

释放写顺序锁
static inline void write_sequnlock(seqlock_t *sl)
{
        smp_wmb();
        sl->sequence++;
        spin_unlock(&sl->lock);
}

下面这些api明显封装了禁止中断、可延迟的获取、释放写顺序锁
#define write_seqlock_irqsave(lock, flags)                                \
        do { local_irq_save(flags); write_seqlock(lock); } while (0)
#define write_seqlock_irq(lock)                                                \
        do { local_irq_disable();   write_seqlock(lock); } while (0)
#define write_seqlock_bh(lock)                                                \
        do { local_bh_disable();    write_seqlock(lock); } while (0)

#define write_sequnlock_irqrestore(lock, flags)                                \
        do { write_sequnlock(lock); local_irq_restore(flags); } while(0)
#define write_sequnlock_irq(lock)                                        \
        do { write_sequnlock(lock); local_irq_enable(); } while(0)
#define write_sequnlock_bh(lock)                                        \
        do { write_sequnlock(lock); local_bh_enable(); } while(0)

读者,读取sequence
/* Start of read using pointer to a sequence counter only.  */
static inline unsigned read_seqcount_begin(const seqcount_t *s)
{
        unsigned ret;
repeat:
        ret = s->sequence;
        smp_rmb();
        if (unlikely(ret & 1)) {
                cpu_relax();
                goto repeat;
        }
        return ret;
}

读者,验证sequence是否有效
static inline int read_seqcount_retry(const seqcount_t *s, unsigned start)
{
        smp_rmb();
        return s->sequence != start;
}

        使用
并不是所有资源都可以使用顺序锁来保护。一般来说必须满足下面条件时才能使用顺序锁。
被保护的数据结构不包含被写者修改和被读者间接调用的指针,读者临界区代码没有副作用,读者的临界区应该尽量简短,写者的次数应该比较少。内核使用顺序锁的典型例子是在时间系统相关的数据结构。

        例子
内核或者驱动调用该函数来获取64位的jiffies值
u64 get_jiffies_64(void)
{
        unsigned long seq;
        u64 ret;
        do {
                seq = read_seqbegin(&xtime_lock);
                ret = jiffies_64;
        } while (read_seqretry(&xtime_lock, seq));
        return ret;
}

在hrtimer模拟的tick的回调函数中tick_sched_timer()调用下面这个函数来更新jiffies。
static void tick_do_update_jiffies64(ktime_t now)
{
        unsigned long ticks = 0;
        ktime_t delta;

        /*
         * Do a quick check without holding xtime_lock:
         */
        delta = ktime_sub(now, last_jiffies_update);
        if (delta.tv64 < tick_period.tv64)
                return;

        /* Reevalute with xtime_lock held */
        write_seqlock(&xtime_lock);

        delta = ktime_sub(now, last_jiffies_update);
        if (delta.tv64 >= tick_period.tv64) {

                delta = ktime_sub(delta, tick_period);
                last_jiffies_update = ktime_add(last_jiffies_update,
                                                tick_period);

                /* Slow path for long timeouts */
                if (unlikely(delta.tv64 >= tick_period.tv64)) {
                        s64 incr = ktime_to_ns(tick_period);

                        ticks = ktime_divns(delta, incr);

                        last_jiffies_update = ktime_add_ns(last_jiffies_update,
                                                           incr * ticks);
                }
                do_timer(++ticks);

                /* Keep the tick_next_period variable up to date */
                tick_next_period = ktime_add(last_jiffies_update, tick_period);
        }
        write_sequnlock(&xtime_lock);
}

void do_timer(unsigned long ticks)
{
        jiffies_64 += ticks;
        update_wall_time();
        calc_global_load();
}
内核保证正确读写呢。现在假设第一个内核路径正在调用get_jiffies_64获取jiffies。
1.        第一个路径read_seqbegin完成,seq =0
2.        第二个内核路径调用tick_do_update_jiffies64更新时间,write_seqlock之后,seq=1
3.        第一个路径读完jiffies,read_seqretry检测发现seq不一致。重新调用read_seqbegin发现seq为奇数所以不断重复read_seqbegin
4.        第二个内核路径更新完成,write_sequnlock之后,seq=2
5.        第一个内核路径read_seqretry,seq=2
6.        第一个路径读完jiffies,read_seqretry验证seq=2成功。
7.        结束。

好吧,其实我也看到了这里有很多smp_wmb,就拿write_seqlock里面的来说吧,不用想就知道大概也是为了保证多个cpu上同时读写的顺序一致性问题的。仍然上面的例子,如果write_seqlock里面没有smp_wmb的话:
1.        第一个路径read_seqbegin完成,seq =0
2.        第二个内核路径调用tick_do_update_jiffies64更新时间,write_seqlock之后,seq=0。什么seq=0?是的因为你没有smp_wmb,但是关键jiffies_64 += ticks才执行了一部分,好吧,就认为jiffies_64现在的值是bad_value把。
3.        不幸的第一个路径刚巧读到了jiffies_64=bad_value,更不幸的是read_seqretry验证seq=0居然成功了。好吧既然这样,bad_value就拿去用吧:)
4.        第二个路径,seq=1终于被cpu写执行了,为迟已晚了。
话说回来,如果让我自己实现顺序锁代码,还真的难以考虑全面。Smp上没有保护的情况下读写共享变量确实需要仔细考虑后再使用。

论坛徽章:
0
4 [报告]
发表于 2011-11-17 18:58 |只看该作者
回复 3# ruslin


    读写信号量和读写自旋锁不一样吧
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP