- 论坛徽章:
- 4
|
最近在看这块,说说我的理解,请大家指点。。。
1.先说说几个概念
锁只是同步方法的一种,同步分阻塞同步和非阻塞同步。
lockfree就可以保证非阻塞同步,同时也尽量保证了waitfree,但依然做不到完全的waitfree。
2.既然讨论无锁,就得说说有锁会导致的问题
a)死锁
多个锁不按顺序的话
b)优先级反转
可能高优先级的等待低优先级的
c)影响实时性
等锁时间不定
d)信号安全
信号处理中不能用锁
e)崩溃处理
崩溃时可能占着锁
f)抢占的影响
被抢占时可能还占着锁
g)影响整体性能
切换进程影响性能
最后,锁的实现是跟硬件相关的,自然影响移植性。
3.无锁首先得根据业务逻辑,这个是首要的。
脱离业务去有锁或无锁是没有意义的。
根据业务流程去简化,一般可以将锁的存在压缩到最小。另外锁只是同步机制的一种,应该还有其它选择。
4.如果已经将有锁的存在压缩到最小的数据结构了,就可以考虑无锁算法了。
通用的lock free算法都是针对基本的数据结构的:buffer,list,queue,map
基本原理就是CAS机制了。现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是CMPXCHG汇编指令。(可见,lockfree是依赖于机器体系结构的,而且lockfree其实还是有锁的,只是CAS给压缩含义了)
一般CAS会封装成下面的形式:(以32bit机为例)
bool cas32( int * pVal, int oldVal, int newVal );
pVal 表示要比较和替换数值的地址,oldVal表示期望的值,newVal表示希望替换成的值。
gcc中:
http://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html- bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
- type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
- These builtins perform an atomic compare and swap. That is, if the current value of *ptr is oldval, then write newval into *ptr.
- The “bool” version returns true if the comparison is successful and newval was written. The “val” version returns the contents of *ptr before the operation.
复制代码 另外,gcc的对于内核中内存屏障也有封装:(内核的kfifo实现可以看到,内核在kfifo时有比CAS机制更加地lock free,用mb即可。当然mb实现也跟机器架构有关)- __sync_synchronize (...)
- This builtin issues a full memory barrier.
复制代码- #define LOCK_PREFIX "lock;"
- #define __sync_bool_compare_and_swap(mem, oldval, newval) \
- ({ __typeof (*mem) ret; \
- __asm __volatile (LOCK_PREFIX "cmpxchgl %2, %1;sete %%al; movzbl %%al,%%eax" \
- : "=a" (ret), "=m" (*mem) \
- : "r" (newval), "m" (*mem), "a" (oldval)\
- :"memory"); \
- ret; })
复制代码 returns true if the comparison is successful and newval was written.
5.下面以说说无锁队列的实现,说下我的理解- EnQueue(x)
- {
- q = newrecord();
- q->value = x;
- q->next = NULL;
-
- do{
- p = tail;
- }while( CAS(p->next, NULL, q) != TRUE);
-
- CAS(tail, p, q);
- }
- DeQueue()
- {
- do{
- p = head;
- if(p->next == NULL){
- returnERR_EMPTY_QUEUE;
- }
- while( CAS(head, p, p->next) != TRUE );
- returnp->next->value;
- }
复制代码 由于是多线程编程,自然要考虑多些。
考虑加入队列的处理:如果thread 1 EnQueue(x)在成功完成第一个CAS,还没开始第二个CAS时挂掉了。那么other threads就会全死循环了:都在等着next字段为NUL的tail,但其实这个tail的next恒指向T1 在第一个CAS中新增的节点了。
解决此问题的修改后的:- EnQueue(x)
- {
- q = newrecord();
- q->value = x;
- q->next = NULL;
-
- p = tail;
- oldp = p
- do{
- while(p->next != NULL)
- p = p->next;
- }while( CAS(p.next, NULL, q) != TRUE);
-
- CAS(tail, oldp, q);
- }
复制代码 这样既保证了即使T1 挂掉,但是T1新增的节点内存还能继续使用。
回复 1# send_linux |
评分
-
查看全部评分
|