免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 9466 | 回复: 7
打印 上一主题 下一主题

蛋疼的无锁编程 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2010-11-15 17:02 |只看该作者 |倒序浏览
本帖最后由 ydfgic 于 2010-11-16 14:24 编辑

哥不知道哪根筋抽了,突然想把这样的代码改成无锁的。
do_with_lock()
{
lock();
/*
aha, I'm safe()
*/
unlock();
}

直接的起源是多线程里打log,对着printf还要加锁感到很刺眼。如果几十线程一起上,都得在这里给wait一下,实在是感觉不划算。
(其实我后来对着测试数据想想,如果程序没有要求到那么变态,这里效率低点也没什么)
正好要写一个log模块,不过当时没怎么想,觉得蛮好写的,就是原子写,然后处理缓冲区满了的时候的情况。
第一版很快写好了,用的是条件变量通知,唤醒等待flush 缓存的线程,继续写。很快发现不是那么简单,时间差可以导致很多微妙的情况,
比如:要等待条件变量的线程,还没来得及休眠,条件变量的通知就过来了,这就导致了这些线程要等下一次的flush唤醒。
后面想了想实在没法解决这个时间差,只有放弃条件变量通知,还是用等待原子变量。
........
经过多次的debug(很多次),终于写好了,再看看,有点复杂,不过也掌握了一些pattern和手法。

测试了一下,输入条件 30线程, 打印100000, buffer长度 409600
无锁版本:
time ./lockless_buffer 30 100000 409600
real    0m2.000s
user    0m12.363s
sys     0m3.029s
直接用mutex版本:
time ./lockless_buffer_lock 30 100000 409600
real    0m6.158s
user    0m5.129s
sys     0m27.524s
time ./lockless_buffer_lock 30 100000 409600
real    0m4.678s
user    0m4.981s
sys     0m27.193s
time ./lockless_buffer_lock 30 100000 409600
real    0m4.103s
user    0m5.111s
sys     0m24.762s

长时间运行:无锁版系统负载达到7,而有锁版的系统负载达到15。
相差还是比较大的,不过值不值得去优化就看情况了。

更新加更正:
printf是不用加锁的,我测试验证了。
但一般说来,printf的效率不高,不是带大缓存,会频繁的调write,我测试结果如下,但是稍差于mutex版本
time ./lockless_buffer 30 100000 409600
real    0m5.372s
user    0m3.792s
sys     0m37.504s

结论:
1.修改的无锁版本很复杂,效率确实有提高,但是复杂度不是一般的高。
2.如果是简单的打log,用printf足够了。
3.如果你要控制log文件大小或者按时间截取log,那就用mutex锁吧
这个例子是个抛砖引玉,不光是打log这个应用场景,将锁变成原子编程,尽量的避免锁冲突,对于对效率优先的一些应用意义很大:比如一个繁忙的任务队列
我再试试把我写的一个任务队列的锁去掉


lockless_buffer.h

  1. #ifndef _LOCKLESS_BUFFER_H
  2. #define _LOCKLESS_BUFFER_H

  3. typedef void * (* clear_buf_handle)(void *);
  4. typedef struct LL_buffer_t LL_buffer_t;

  5. #ifdef __cplusplus
  6. extern "C" {
  7. #endif

  8. int LL_buf_create(LL_buffer_t ** pbuf, size_t buf_len, clear_buf_handle clear_callback, void * callback_arg, int fd);
  9. int LL_buf_write(LL_buffer_t * pbuf, char * puts, size_t put_len);
  10. int LL_buf_flush(LL_buffer_t * pbuf); //call in callback_func
  11. int LL_buf_destory(LL_buffer_t * pbuf);

  12. #ifdef __cplusplus
  13. }
  14. #endif
  15. #endif

复制代码
lockless_buffer.c

  1. #include<stdlib.h>
  2. #include <string.h>
  3. #include<pthread.h>
  4. #include<unistd.h>
  5. #include <sys/types.h>
  6. #include <sys/stat.h>
  7. #include <sched.h> //sched_yield


  8. #include"atomic.h"
  9. #include"lockless_buffer.h"


  10. #include<err.h>

  11. //gcc -g -Wall lockless_buffer.c -o lockless_buffer -lpthread
  12. //gcc -g -Wall lockless_buffer.c -o lockless_buffer_lock -lpthread -DUSE_LOCK

  13. struct LL_buffer_t{
  14.         size_t buf_sz;
  15.         int fd;
  16.         volatile int flag_run;
  17.         atomic_t offset_at;
  18.         atomic_t working_at;
  19.         atomic_t done_offset_at;
  20.         atomic_t flag_at;
  21.        
  22.         clear_buf_handle clear_callback;
  23.         void * cb_arg;
  24.         char buf[0];
  25. };

  26. int LL_buf_create(LL_buffer_t ** ppbuf, size_t buf_len, clear_buf_handle clear_callback, void * cb_arg, int fd)
  27. {
  28.         struct stat st;
  29.         if(fd <= 0 || fstat(fd, &st) != 0 || !(st.st_mode & (S_IWUSR | S_IWGRP | S_IWOTH)))
  30.                 return -1;

  31.         *ppbuf = (LL_buffer_t *)calloc(1, sizeof(LL_buffer_t) + buf_len - 1);
  32.         if(! *ppbuf)
  33.                 return -1;

  34.         (*ppbuf)->buf_sz = buf_len;
  35.         (*ppbuf)->fd = fd;
  36.         (*ppbuf)->clear_callback = clear_callback;
  37.         (*ppbuf)->cb_arg = cb_arg != NULL ? cb_arg : *ppbuf;

  38.         atomic_set(&(*ppbuf)->offset_at, 0);
  39.         atomic_set(&(*ppbuf)->working_at, 0);
  40.         atomic_set(&(*ppbuf)->done_offset_at, 0);
  41.         atomic_set(&(*ppbuf)->flag_at, 0);

  42.         (*ppbuf)->flag_run = 1;
  43.         return 0;
  44. }

  45. int LL_buf_write(LL_buffer_t * pbuf, char * puts, size_t put_len)
  46. {
  47.         int offset;
  48.         if(unlikely(put_len > pbuf->buf_sz))
  49.                 return -1;

  50. #define TEST_OVER_LEN() ((atomic_read(&pbuf->offset_at) + put_len) > pbuf->buf_sz)
  51.         while(pbuf->flag_run)
  52.         {
  53.                 atomic_dec(&pbuf->working_at); //设置 临界区域1 等待 标志
  54.                 if(likely(!TEST_OVER_LEN() &&        //条件a
  55.                                   atomic_read(&pbuf->flag_at) == 0 &&                //条件b        , 临界区域2 等待 标志
  56.                                   pbuf->buf_sz >= (offset = atomic_add_return(put_len, &pbuf->offset_at)))) //条件c, 不符合则表示发生改变临界条件的冲突
  57.                 { // 可以添加后在buf范围内
  58.                         //XXX 必须完成所有写工作才能,dump,要不会打乱写的操作,比如一个在后面的写完成了,而前面的写没完成,造成了空洞
  59.                         //(因为,dump是按照done_offset写的)
  60.                         //while(atomic_read(&pbuf->flag_at) > 0); //死锁

  61.                         //XXX 临界区域1
  62.                         memcpy(&pbuf->buf[offset - put_len], puts, put_len);
  63.                         atomic_add(put_len, &pbuf->done_offset_at);
  64.                         atomic_inc(&pbuf->working_at);  //通知 临界区域1 完成
  65.                         return 0;
  66.                 }
  67.                 else
  68.                 {
  69.                         atomic_inc(&pbuf->working_at);  //解除不符合条件的等待1 的标志
  70.                         if(!TEST_OVER_LEN())
  71.                         {
  72.                                 //条件b,不符合等待2 条件
  73.                                 while(atomic_read(&pbuf->flag_at) != 0)
  74.                                         sched_yield();
  75.                         }
  76.                         else
  77.                         {
  78.                                 //不符合条件a 的 和 临界区2 条件不符合
  79.                                 if(0 == atomic_xchg(&pbuf->flag_at,1))  //设置临界区2 条件b 为假
  80.                                 { //double check
  81.                                         if(TEST_OVER_LEN())
  82.                                         {
  83.                                                 //XXX 临界区域2

  84.                                                 //等待之前的copy操作完成 , 阻止了这样的情况:比如这里有个长字符串超界后,但是又可以写一个短字符串
  85.                                                 while(atomic_read(&pbuf->working_at) < 0);                //等待 临界区域1 完成

  86.                                                 if(pbuf->flag_run == 0) //must
  87.                                                         break;
  88.                                                 //只有一个线程进入
  89.                                                 if(pbuf->clear_callback)
  90.                                                         pbuf->clear_callback(pbuf->cb_arg);
  91.                                                 else
  92.                                                         LL_buf_flush(pbuf);

  93.                                                 //set to begin
  94.                                                 atomic_set(&pbuf->done_offset_at, 0);
  95.                                                 atomic_set(&pbuf->offset_at, 0);        //条件a  OK
  96.                                         }
  97.                                         atomic_set(&pbuf->flag_at, 0);                        //通知 临界区域2 完成
  98.                                 }
  99.                                 else
  100.                                 {
  101.                                         while(TEST_OVER_LEN()) //等待条件a
  102.                                                 sched_yield();  //must
  103.                                 }
  104.                         }
  105.                 }
  106.         }
  107.         return 0;
  108. #undef TEST_OVER_LEN
  109. }

  110. int LL_buf_flush(LL_buffer_t * pbuf)
  111. {
  112.         return write(pbuf->fd, pbuf->buf, atomic_read(& pbuf->done_offset_at)) >= 0 ? 0 : -1;       
  113. }
  114. int LL_buf_destory(LL_buffer_t * pbuf)
  115. {
  116.         pbuf->flag_run = 0;
  117.         while(atomic_read(&pbuf->working_at) != 0); //for safe
  118.         if(atomic_read(& pbuf->done_offset_at) > 0)
  119.         {
  120.                 write(pbuf->fd, pbuf->buf, atomic_read(& pbuf->done_offset_at));
  121.         }
  122.         free(pbuf);
  123.         return 0;
  124. }


  125. #ifndef TEST

  126. #include<err.h>
  127. #include<fcntl.h>
  128. #include<stdio.h>

  129. void * flush_file(void * pvar)
  130. {
  131.         LL_buf_flush((LL_buffer_t *)pvar);
  132.         return 0;
  133. }

  134. #define THREAD_NR_MAX 100
  135. const char * ptext = "The quick brown fox jumps over the lazy dog";
  136. struct LL_buffer_t *lbuf ;
  137. int write_nr = 1000;
  138. int buf_len = 4096 * 4;
  139. pthread_t tid[THREAD_NR_MAX];
  140. int gfd;
  141. pthread_barrier_t barrier;

  142. void * thread_func(void * arg)
  143. {
  144.         int id = *(int *)arg;
  145. //        pthread_detach(pthread_self());
  146.         //fprintf(stderr,"id:%d\t",id);
  147.         pthread_barrier_wait(&barrier);
  148.         char buf[256];
  149.         char text[128];
  150.         int i =0;
  151.         strcpy(text,ptext);
  152.         text[strlen(ptext) - id + 1]  = 0;
  153.         for(;i < write_nr; i++)
  154.         {
  155.                 sprintf(buf,"thread %d, time %d:%s\n", id, i, text);
  156.                 LL_buf_write(lbuf, buf, strlen(buf));
  157.         }
  158.         //fprintf(stderr," id:%d,cmplete %d\n",id, i);

  159.         return 0;
  160. }

  161. //use lock for test
  162. char *buf_for_lock;
  163. volatile int buf_offset;
  164. pthread_mutex_t mutex_for_lock = PTHREAD_MUTEX_INITIALIZER;               


  165. void * thread_func_for_lock(void * arg)
  166. {
  167. //        pthread_detach(pthread_self());
  168.         pthread_barrier_wait(&barrier);
  169.         int id = *(int *)arg;
  170.         char buf[256];
  171.         char text[128];
  172.         int i =0, wlen;
  173.         strcpy(text,ptext);
  174.         text[strlen(ptext) - id + 1]  = 0;
  175.         for(;i < write_nr; i++)
  176.         {
  177.                 sprintf(buf,"thread %d, time %d:%s\n", id, i, text);
  178.                 wlen = strlen(buf);
  179.                 pthread_mutex_lock(&mutex_for_lock);
  180.                 if((wlen + buf_offset) > buf_len)
  181.                 {
  182.                         //write in file
  183.                         write(gfd, buf_for_lock, buf_offset);
  184.                         buf_offset = 0;
  185.                 }
  186.                 memcpy(buf_for_lock + buf_offset, buf, wlen);
  187.                 buf_offset += wlen;
  188.                 pthread_mutex_unlock(&mutex_for_lock);
  189.         }
  190.         return 0;
  191. }

  192. int main(int c, char * s[])
  193. {
  194.         int thr_id[THREAD_NR_MAX];
  195.         int i, thr_nr;
  196.         if(c != 4)
  197.         {
  198.                 errx(-1, "usage: %s [thread_num] [write_line_num] [buf_length]", s[0]);
  199.         }
  200.         thr_nr = atoi(s[1]);
  201.         write_nr = atoi(s[2]);
  202.         buf_len = atoi(s[3]);
  203.         buf_len = (buf_len + 4096) & ~(4096 -1);


  204.         pthread_barrier_init(&barrier,NULL,thr_nr+1);
  205.        
  206.         if( 0 > (gfd = open("./test_llbuf.output", O_WRONLY | O_CREAT | O_TRUNC , S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)))
  207.                 err(-1,"open");
  208.        
  209. #ifndef USE_LOCK
  210.         if(0 > LL_buf_create(&lbuf, buf_len, flush_file, lbuf, gfd))
  211.                 err(-1,"create LLBuffer");
  212. #else
  213.         buf_for_lock = (char*)malloc(buf_len);
  214. #endif

  215.         for(i= 0; i< thr_nr; i++)
  216.         {
  217.                 thr_id[i] = i;
  218. #ifndef USE_LOCK
  219.                 pthread_create(tid+i, NULL, thread_func, &thr_id[i]);
  220. #else
  221.                 pthread_create(tid+i, NULL, thread_func_for_lock, &thr_id[i]);
  222. #endif
  223.         }

  224.         pthread_barrier_wait(&barrier);
  225.         for(i= 0; i< thr_nr; i++)
  226.         {
  227.                 pthread_join(tid[i],NULL);
  228. //                warnx("join thr:%d", i);
  229.         }

  230. #ifndef USE_LOCK
  231.         LL_buf_destory(lbuf);
  232. #else
  233.         write(gfd, buf_for_lock, buf_offset);
  234.         free(buf_for_lock);
  235. #endif
  236.         return 0;
  237. }
  238. #endif
复制代码
lockless_buffer.tar.gz (2.91 KB, 下载次数: 92)
奇怪atomic.h怎么是乱码,补上
atomic.h.zip (2.55 KB, 下载次数: 93)

论坛徽章:
0
2 [报告]
发表于 2010-11-15 17:10 |只看该作者
排版真差啊,一行怎么那么短{:3_191:}

论坛徽章:
1
申猴
日期:2014-02-11 14:50:31
3 [报告]
发表于 2010-11-15 20:35 |只看该作者
告诉你,printf不需要锁

论坛徽章:
324
射手座
日期:2013-08-23 12:04:38射手座
日期:2013-08-23 16:18:12未羊
日期:2013-08-30 14:33:15水瓶座
日期:2013-09-02 16:44:31摩羯座
日期:2013-09-25 09:33:52双子座
日期:2013-09-26 12:21:10金牛座
日期:2013-10-14 09:08:49申猴
日期:2013-10-16 13:09:43子鼠
日期:2013-10-17 23:23:19射手座
日期:2013-10-18 13:00:27金牛座
日期:2013-10-18 15:47:57午马
日期:2013-10-18 21:43:38
4 [报告]
发表于 2010-11-15 21:06 |只看该作者
附件中atomic.h文件是乱码{:3_198:}

论坛徽章:
0
5 [报告]
发表于 2010-11-15 22:14 |只看该作者
回复 4# hellioncu


    补上了,打包出错了,可能

论坛徽章:
7
丑牛
日期:2013-10-18 14:43:21技术图书徽章
日期:2013-11-03 09:58:03辰龙
日期:2014-01-15 22:57:50午马
日期:2014-09-15 07:04:39丑牛
日期:2014-10-16 14:25:222015年亚洲杯之伊朗
日期:2015-03-16 10:24:352015亚冠之城南
日期:2015-05-31 09:52:32
6 [报告]
发表于 2010-11-15 22:28 |只看该作者
回复 3# chenzhanyiczy


    你这是逼LZ跳楼?
printf应该类似fwite的行缓存,而且是带append方式打开的.确实不需要加锁...

论坛徽章:
26
处女座
日期:2016-04-18 14:00:4515-16赛季CBA联赛之深圳
日期:2020-06-02 10:10:5015-16赛季CBA联赛之广夏
日期:2019-07-23 16:59:452016科比退役纪念章
日期:2019-06-26 16:59:1315-16赛季CBA联赛之天津
日期:2019-05-28 14:25:1915-16赛季CBA联赛之青岛
日期:2019-05-16 10:14:082016科比退役纪念章
日期:2019-01-11 14:44:062016科比退役纪念章
日期:2018-07-18 16:17:4015-16赛季CBA联赛之上海
日期:2017-08-22 18:18:5515-16赛季CBA联赛之江苏
日期:2017-08-04 17:00:4715-16赛季CBA联赛之佛山
日期:2017-02-20 18:21:1315-16赛季CBA联赛之天津
日期:2016-12-12 10:44:23
7 [报告]
发表于 2010-11-16 00:20 |只看该作者
多线程很容易出问题,还是单线程好啊 ~

论坛徽章:
0
8 [报告]
发表于 2010-11-16 13:42 |只看该作者
告诉你,printf不需要锁
chenzhanyiczy 发表于 2010-11-15 20:35

回复  chenzhanyiczy


    你这是逼LZ跳楼?
printf应该类似fwite的行缓存,而且是带append方式打开的. ...
smalloc 发表于 2010-11-15 22:28


我跳楼去,printf是不需要加锁,我测试了

我再测测,dup 一下log文件到标准输出的效率
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP