- 论坛徽章:
- 0
|
第8章. 加锁速度
在考虑使用锁的代码的速度问题上,主要有三件事需要关注。首先是并发性:当锁被别人持有时,有多少事情是我们需要等待的。其次是,需要花多少时间来获取与释放一把无争议的锁。第三呢,是尽量使用更少的──或曰更灵活的──锁。当然,这里我已经假定锁的使用非常频繁,否则你都不用担心效率问题的。
并发性依赖于锁被持有多长时间:你持有锁的时间,就应该同确实需要持有它的时间一样长,不要更长。在上面那个关于cache的例子中,我们在创建对象时是不持有锁的,只有在已经准备好把它插入到链表中时才去获取锁。
获取锁的时间依赖于加锁操作对于管道线(pipeline stalls)做了多少破坏性工作,以及本CPU是最后一个试图获取该锁的CPU(亦即:对本CPU来说,锁是否为缓存密集的(cache-hot))的可能性有多大:在具有很多CPU的机器上,这种可能性迅速降低。考虑一个700MHz的Intel Pentium III处理器:一条指令大约花费0.7纳秒,一次原子增加操作大约花费58纳秒,一次缓存密集的的加锁操作大约花费160纳秒,而从其他CPU上的一次缓存行转换还要额外花费170到360纳秒。(这些数据来自于Paul McKenney的Linux Journal RCU article )
这两个目标会发生冲突:通过把锁分成不同的的部分,可以减少持有锁的时间(例如上面例子中“每个对象一把的锁”),但是这增加了获取锁这一操作的数量,因而结果一般是比只有一把锁的情况要慢一些。这也是推崇加锁简洁性的一个理由。
第三个考虑点放到了下面:的确存在一些方法能够减少锁的使用。
8.1. 读/写锁变体
自旋锁与信号量都具有读/写变体:rwlock_t和struct rw_semaphore。它们把使用者分成了两类:读者和写者。如果你只需要读数据,你该获取读锁;但如果你需要写数据,你需要获取写锁。可以有很多使用者同时获取读锁,但写锁却只能有单个使用者持有。
如果你的代码可以简洁的划分出读和写的界限(就象我们上面的cache代码那样),而且读者通常需要很长时间的持有读锁,使用读写锁会更好。它们要比普通的锁要稍微慢点儿,所以在实践中rwlock_t通常并不很值得使用。
8.2. 避免锁的使用:RCU
有一种特殊的读/写锁定方法叫做RCU(Read Copy Update),读者可以完全避免使用锁:由于我们希望我们的cache被读取远比被更新来的频繁(否则的话,还引入cache干什么?),使用RCU就是一个可选择的优化。
如何移除读锁呢?移除读锁意味着写者可能在存在读者的情况下更新链表。这很简单:要向链表中添加元素的代码足够小心,我们就可以在它添加的同时读取链表。例如,把new元素添加到list链表中:
- new->next = list->next;
- wmb();
- list->next = new;
-
复制代码
wmb()是一个写内存屏障。它保证了第一个操作(设置新元素的next指针)是完整的,而且立即为其它CPU所见,这发生在第二个操作(把新元素添加到链表中)之前。这是很重要的,因为现代CPU和编译器可能会改变指令的执行顺序,除非明确告诉它们不要这样:我们希望一个读者要么完全看不到新元素的插入,要么能够看到新元素插入之后的next指针正确指向了链表的剩余部分。
幸运的是,有一个函数专门添加标准的struct list_head链表:list_add_rcu() (include/linux/list.h)。
从链表中删除元素更简单:假设我们要删除的元素为A,我们让指向A的指针重新指向A的后继者,读者要么完全看到了它,要么完全忽略了它。
list->next = old->next;
有一个list_del_rcu()函数(include/linux/list.h)来做该工作(而普通版本的删除操作会毒害(poison)要移除的对象,这是我们不愿看到的)。
读者也一定要小心:有些CPU会观察next指针以便预取下一个元素,但这样的话,如果next指针恰恰在这时候发生了变化,那么这些CPU预取的(pre-fetched)内容就是错误的。还好,有一个list_fro_each_entry_rcu()函数(include/linux/list.h)可以帮助你。当然,写者可以只使用list_for_each_entry() (译者案:这里的“只使用”,是说不用但是是否需要一个XXX_rcu()这样的函数),因为不可能同时有两个写者。
我们的困难最终是:什么时候我们可以真正的删除元素?记住,一个读者可能正在访问要被删除的元素:如果我们释放掉这个元素,而让next指针指向新元素,读者将立即访问到垃圾内存并崩溃。我们必须等待到所有正在遍历链表的读者都结束了遍历,才能真正释放掉元素。我们用call_rcu()函数注册一个回调函数,当所有读者结束遍历时,它会真正销毁对象。
但是,RCU如何得知什么时候读者结束遍历了呢?首先,读者总是在rcu_read_lock()/rcu_read_unlock()之间执行遍历:这会禁止抢占(而且只是禁止抢占,其它什么都不做),因此读者在遍历链表期间不会睡眠。
RCU一直等待到其他的CPU至少睡眠了一次:因为读者不会在读取过程中睡眠,此时我们就知道我们等待其结束的那些读者终于都结束了。于是回调函数被调用。真实的RCU代码要比这里讲述的更优化些,但这里讲的是它的基础。
- --- cache.c.perobjectlock 2003-12-11 17:15:03.000000000 +1100
- +++ cache.c.rcupdate 2003-12-11 17:55:14.000000000 +1100
- @@ -1,15 +1,18 @@
- #include <linux/list.h>
- #include <linux/slab.h>
- #include <linux/string.h>
- +#include <linux/rcupdate.h>
- #include <asm/semaphore.h>
- #include <asm/errno.h>
-
- struct object
- {
- - /* 下面两个域由cache_lock保护 */
- + /* 下面由RCU保护 */
- struct list_head list;
- int popularity;
-
- + struct rcu_head rcu;
- +
- atomic_t refcnt;
-
- /* 一旦创建就不要更改. */
- @@ -40,7 +43,7 @@
- {
- struct object *i;
-
- - list_for_each_entry(i, &cache, list) {
- + list_for_each_entry_rcu(i, &cache, list) {
- if (i->id == id) {
- i->popularity++;
- return i;
- @@ -49,19 +52,25 @@
- return NULL;
- }
-
- +/* 一旦我们知道没有读者了,立即最终丢弃对象. */
- +static void cache_delete_rcu(void *arg)
- +{
- + object_put(arg);
- +}
- +
- /* 必须在持有cache_lock的情况下调用 */
- static void __cache_delete(struct object *obj)
- {
- BUG_ON(!obj);
- - list_del(&obj->list);
- - object_put(obj);
- + list_del_rcu(&obj->list);
- cache_num--;
- + call_rcu(&obj->rcu, cache_delete_rcu, obj);
- }
-
- /* 必须在持有cache_lock的情况下调用 */
- static void __cache_add(struct object *obj)
- {
- - list_add(&obj->list, &cache);
- + list_add_rcu(&obj->list, &cache);
- if (++cache_num > MAX_CACHE_SIZE) {
- struct object *i, *outcast = NULL;
- list_for_each_entry(i, &cache, list) {
- @@ -85,6 +94,7 @@
- obj->popularity = 0;
- atomic_set(&obj->refcnt, 1); /* cache本身占一个引用计数 */
- spin_lock_init(&obj->lock);
- + INIT_RCU_HEAD(&obj->rcu);
-
- spin_lock_irqsave(&cache_lock, flags);
- __cache_add(obj);
- @@ -104,12 +114,11 @@
- struct object *cache_find(int id)
- {
- struct object *obj;
- - unsigned long flags;
-
- - spin_lock_irqsave(&cache_lock, flags);
- + rcu_read_lock();
- obj = __cache_find(id);
- if (obj)
- object_get(obj);
- - spin_unlock_irqrestore(&cache_lock, flags);
- + rcu_read_unlock();
- return obj;
- }
复制代码
注意读者会在__cache_find()中改变popularity域的值,但是没有使用锁。我们的方案应该把它变成atomic_t类型的,但是对本例来说我们不会导致竞态条件:近似的结果就已经不错了,所以我没做改变。
结果是,cache_find()函数并不需要与其它函数同步,因此它在SMP上几乎跟在UP上一样快。
此处还存在一个更纵深的可能的优化:想一想我们最初的cache代码,那时没有引用计数,无论何时调用者想访问对象,就要持有锁。这仍然是可能的:如果你持有锁,就没人能够删掉对象,因此你就不必读取并写回引用计数了。
由于RCU中的“读锁”会禁止抢占(而且只是禁止抢占,其它什么都不做),调用者如果调用cache_find()或object_put()之前已经禁止抢占了,就并不需要读取并写回引用计数:我们可以把__cache_find()变成非static的从而暴露给其它文件,这时候调用者就可以直接调用这些函数了。
此处的优势是:引用计数并不被写入,对象并不被更改,这在SMP机器上会非常快──由于CPU的caching的原因。
8.3. Per-CPU数据
另一种使用相当广泛的避免锁的技术是,为每个CPU使用数据的一份拷贝。例如,如果你想为一个普通的条件保持一个计数,你可能会使用自旋锁和一个计数器,这简单而漂亮。
如果它表现的太慢了(通常不会,但是如果你的确测试到这种情况发生了),你就该改变一下策略:为每个CPU保持一个计数器,这样,没有一个计数器需要互斥的锁来保护。参考DEFINE_PER_CPU(),get_cpu_var()和put_cpu_var() (include/linux/percpu.h)。
Per-CPU计数器的另一个用场是local_t数据类型,和cpu_local_inc()以及其它的相关函数。这在某些体系结构上是比简单的加锁代码更高效的。(include/asm/local.h)
注意,对计数器来说,根本不存在简单而可靠的方法可以精确的得到它的值而不需要引入锁。只不过某些情况下这不是问题罢了。
8.4. 中断服务例程通常使用哪些数据
如果数据只是在中断服务例程内部访问,你根本不需要锁:内核本身已经保证了中断服务例程不会同时在多个CPU上运行。
Manfred Spraul指出,即使数据会偶尔被用户上下文或softirqs/tasklets访问,你仍然可以这样。中断服务例程自身不需要锁,其它所有的访问者这样做:
- spin_lock(&lock);
- disable_irq(irq);
- ...
- enable_irq(irq);s
- spin_unlock(&lock);
复制代码
disable_irq()禁止了中断服务例程的运行(如果此刻已经在别的CPU上运行了,那就等待它的结束)。自旋锁禁止了同时刻其它任何可能的访问。自然,这比单个spin_lock_irq()调用要慢些,所以只有这种访问很少发生时这种用法才是合理的。 |
|