免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
123下一页
最近访问板块 发新帖
查看: 52787 | 回复: 23

[C] __sync_xxx 系列函数似乎我的理解有很大问题 [复制链接]

论坛徽章:
11
未羊
日期:2013-12-16 12:45:4615-16赛季CBA联赛之青岛
日期:2016-04-11 19:17:4715-16赛季CBA联赛之广夏
日期:2016-04-06 16:34:012015亚冠之卡尔希纳萨夫
日期:2015-11-10 10:04:522015亚冠之大阪钢巴
日期:2015-07-30 18:29:402015亚冠之城南
日期:2015-06-15 17:56:392015亚冠之卡尔希纳萨夫
日期:2015-05-15 15:19:272015亚冠之山东鲁能
日期:2015-05-14 12:38:13金牛座
日期:2014-12-04 15:34:06子鼠
日期:2014-10-16 13:40:4715-16赛季CBA联赛之八一
日期:2016-07-22 09:41:40
发表于 2013-08-15 19:42 |显示全部楼层
以下代码铁定异常, 概率很大, 但我目前理解理解不了为什么

  1. #include <stddef.h>

  2. struct lognode {
  3.     const char* oplog;
  4.     size_t bytes;
  5.     struct lognode* next;
  6. };

  7. static struct lognode root = {0};
  8. static struct lognode* head = &root;
  9. static struct lognode* tail = &root;

  10. void lognode_put(struct lognode* node)
  11. {
  12.     struct lognode* before = NULL;
  13.     struct lognode* last;
  14.     do {
  15.         last = tail;
  16.         struct lognode* before = __sync_val_compare_and_swap(&(last->next), NULL, node);
  17.     } while (before != NULL);

  18.     __sync_val_compare_and_swap(&tail, last, node);
  19. }

  20. struct lognode* lognode_get()
  21. {
  22.     struct lognode* node = head->next;
  23.     head->next = NULL;
  24.     //tail = head;
  25.     __sync_lock_test_and_set(&tail, head);
  26.     return node;
  27. }


  28. #include <stdio.h>
  29. #include <assert.h>
  30. #include <pthread.h>

  31. #define nrp 10000

  32. void* writer(void* any)
  33. {
  34.     int n = nrp;
  35.     for (int i = 0; i < n; ++i) {
  36.         struct lognode* nodep = (struct lognode *) malloc(sizeof(struct lognode));
  37.         assert(nodep != NULL);
  38.         lognode_put(nodep);
  39.         //usleep(100);
  40.     }   

  41.     printf("done\n");
  42.     return NULL;
  43. }


  44. int main(int argc, char** argv)
  45. {
  46.     int n = 0;
  47.     for (int i = 0; i < 1; ++i) {
  48.         pthread_t tid;
  49.         if(0 == pthread_create(&tid, NULL, writer, NULL)) {
  50.             ++n;
  51.         }   
  52.     }   

  53.     int nr = nrp * n;
  54.     printf("%d writer, nr = %d\n", n, nr);
  55.         
  56.     while (nr > 0) {
  57.         struct lognode* nodep = lognode_get();
  58.         struct lognode* cur = nodep;
  59.         while (cur != NULL) {
  60.             nodep = nodep->next;
  61.             free(cur);
  62.             cur = nodep;
  63.             --nr;
  64.         }   
  65.         if (nr > 100000 && 0 == nr % 100000) {
  66.             printf("%d\n", nr);
  67.         } else if (nr > 10000 && 0 == nr % 10000) {
  68.             printf("%d\n", nr);
  69.         } else if (nr > 1000 && 0 == n % 1000) {
  70.             printf("%d\n", nr);
  71.         }   
  72.     }   

  73.     return 0;
  74. }
  75.   
复制代码

论坛徽章:
0
发表于 2013-08-16 00:02 |显示全部楼层
如果没有理解错,楼主的代码实际上是想实现一个类似半工状态的无锁列表,put操作也就是生产者添加节点到列表尾,get操作也就是消费者把列表整个取下,再等新的列表建立,如此循环。

其中put操作,示例中代码只有一个生产者,但似乎有意使用多个生产者。多个会有竞争,一个的话因为get操作的方式,也仍然会跟消费者竞争。尾节点附加操作,竞争失败循环,看上去似乎没问题,但实际上存在取tail->next操作,这个不是原子操作的一部分。并且成功后替换tail指针仍然存在竞争(多个生产者情况下)。也就是说附加尾节点与替换尾指针整个不是原子操作,并且没有避开多处潜在竞争问题,所以是错误的。

继续看get的操作,示例代码的消费者似乎只有一个,并且对tail的操作是原子化的。但如上述,因为修改tail的操作会跟put的第一步操作取tail->next以及修改tail->next这个不是原子化的操作存有竞争,所以非常错误。另外据文档显示这里使用的__sync_lock_test_and_set可能会有问题,而且一般也不是必要的(对于简单赋值操作,必要的是内存栏栅),不过x86应该是可以的。

可以看看这篇文章http://coolshell.cn/articles/8239.html,注意后面的回复。

论坛徽章:
1
程序设计版块每日发帖之星
日期:2015-09-23 06:20:00
发表于 2013-08-16 00:06 |显示全部楼层
本帖最后由 irp 于 2013-08-16 00:24 编辑

1, #45, malloc之后,nodep->next应该初始化,否则#73,74就会crash, 显然你expect next == NULL是结束条件,但是lognode_put仅仅queue这个node,没有nullize next

论坛徽章:
9
摩羯座
日期:2013-08-15 15:18:48狮子座
日期:2013-09-12 18:07:47金牛座
日期:2013-09-16 13:23:09辰龙
日期:2013-10-09 09:03:27白羊座
日期:2013-10-17 13:32:44子鼠
日期:2014-04-23 15:09:38戌狗
日期:2014-09-17 11:37:542015年亚洲杯之韩国
日期:2015-03-26 10:16:442015亚冠之武里南联
日期:2015-08-18 14:55:52
发表于 2013-08-16 10:02 |显示全部楼层
我觉得具体逻辑先不说,但是19行的before和15、20行的before明显不是同一个

论坛徽章:
11
未羊
日期:2013-12-16 12:45:4615-16赛季CBA联赛之青岛
日期:2016-04-11 19:17:4715-16赛季CBA联赛之广夏
日期:2016-04-06 16:34:012015亚冠之卡尔希纳萨夫
日期:2015-11-10 10:04:522015亚冠之大阪钢巴
日期:2015-07-30 18:29:402015亚冠之城南
日期:2015-06-15 17:56:392015亚冠之卡尔希纳萨夫
日期:2015-05-15 15:19:272015亚冠之山东鲁能
日期:2015-05-14 12:38:13金牛座
日期:2014-12-04 15:34:06子鼠
日期:2014-10-16 13:40:4715-16赛季CBA联赛之八一
日期:2016-07-22 09:41:40
发表于 2013-08-16 22:49 |显示全部楼层
本帖最后由 zylthinking 于 2013-08-16 22:55 编辑
zylthinking 发表于 2013-08-15 19:42
以下代码铁定异常, 概率很大, 但我目前理解理解不了为什么


研究了一天, 现在如果只有  put 没有 get 的情况下, 可以实现正确写了, 但若存在 get, 则会出问题, 似乎哪里还是有问题, 继续研究

  1. #define  rmb()  __asm__ volatile("lfence":::"memory")
  2. #define  wmb()  __asm__ volatile("sfence":::"memory")

  3. struct lognode {
  4.     struct lognode* next;
  5. };

  6. static struct lognode root = {0};
  7. static struct lognode* head = &root;
  8. static struct lognode** tail = &(root.next);

  9. void lognode_put(struct lognode* node)
  10. {
  11.     while (node != __sync_val_compare_and_swap(tail, NULL, node));

  12.     struct lognode** last = tail;
  13.     rmb();
  14.     if (*last != node) {
  15.         return;
  16.     }

  17.     __sync_bool_compare_and_swap(&tail, last, &(node->next));
  18. }

  19. struct lognode* lognode_get()
  20. {
  21.     struct lognode* node = head->next;
  22.     head->next = NULL;
  23.     wmb();
  24.     //tail = &(root.next);
  25.     __sync_lock_test_and_set(&tail, &(root.next));
  26.     return node;
  27. }
复制代码
测试代码

  1. #define nrp 10000
  2. #define nrt 1000

  3. static volatile int run = 0;
  4. static struct lognode* pool = NULL;
  5. static volatile int indx = 0;

  6. void* writer(void* any)
  7. {
  8.     while (run == 0) {
  9.         usleep(100);
  10.     }

  11.     for (int i = 0; i < nrp; ++i) {
  12.         int idx = __sync_fetch_and_add(&indx, 1);
  13.         struct lognode* nodep = &pool[idx];
  14.         nodep->next = NULL;
  15.         lognode_put(nodep);
  16.     }

  17.     __sync_sub_and_fetch(&run, 1);
  18.     return NULL;
  19. }

  20. #define use_get

  21. int main(int argc, char** argv)
  22. {
  23.     int n = 0;
  24.     for (int i = 0; i < nrt; ++i) {
  25.         pthread_t tid;
  26.         if(0 == pthread_create(&tid, NULL, writer, NULL)) {
  27.             ++n;
  28.             pthread_detach(tid);
  29.         }
  30.     }

  31.     int nr = nrp * n;
  32.     pool = (struct lognode *) malloc(sizeof(struct lognode) * nr);
  33.     if (pool == 0) {
  34.         printf("memory\n");
  35.         return 0;
  36.     }
  37.     run = n;
  38.     printf("run = %d\n", run);

  39.     printf("%d writer, nr = %d\n", n, nr);
  40.     time_t t1 = time(0);

  41.     while (run > 0) {
  42. #ifdef use_get
  43.         struct lognode* nodep = lognode_get();
  44.         struct lognode* cur = nodep;
  45.         while (cur != NULL) {
  46.             nodep = nodep->next;
  47.             cur = nodep;
  48.             --nr;
  49.         }
  50. #endif

  51.         time_t t2 = time(0);
  52.         if (t2 > t1 + 3) {
  53.             printf("%d %d\n", nr, indx);
  54.             t1 = t2;
  55.         }
  56.     }

  57. #ifndef use_get
  58.     struct lognode* nodep = lognode_get();
  59.     struct lognode* cur = nodep;
  60.     while (cur != NULL) {
  61.         nodep = nodep->next;
  62.         cur = nodep;
  63.         --nr;
  64.     }
  65. #endif
  66.     printf("done %d\n", nr);
  67.     return 0;
  68. }
复制代码

论坛徽章:
5
狮子座
日期:2013-08-20 10:12:24午马
日期:2013-11-23 18:04:102015年辞旧岁徽章
日期:2015-03-03 16:54:152015亚冠之德黑兰石油
日期:2015-06-29 18:11:1115-16赛季CBA联赛之新疆
日期:2024-02-21 10:00:53
发表于 2013-08-17 08:44 |显示全部楼层
zyl,想问问你,你这个是用户态的程序吧?如果是单核的CPU,那么你的while循环相当于一个spinlock,会耗尽当前的时间片,这样做会不会反而降低效率?只有在多核的情况下才有实用价值吧?

论坛徽章:
11
未羊
日期:2013-12-16 12:45:4615-16赛季CBA联赛之青岛
日期:2016-04-11 19:17:4715-16赛季CBA联赛之广夏
日期:2016-04-06 16:34:012015亚冠之卡尔希纳萨夫
日期:2015-11-10 10:04:522015亚冠之大阪钢巴
日期:2015-07-30 18:29:402015亚冠之城南
日期:2015-06-15 17:56:392015亚冠之卡尔希纳萨夫
日期:2015-05-15 15:19:272015亚冠之山东鲁能
日期:2015-05-14 12:38:13金牛座
日期:2014-12-04 15:34:06子鼠
日期:2014-10-16 13:40:4715-16赛季CBA联赛之八一
日期:2016-07-22 09:41:40
发表于 2013-08-17 17:00 |显示全部楼层
starwing83 发表于 2013-08-17 08:44
zyl,想问问你,你这个是用户态的程序吧?如果是单核的CPU,那么你的while循环相当于一个spinlock,会耗尽当 ...


当然是多核, 单核不让出 CPU 那不是傻了, 再轮也进不去啊

论坛徽章:
0
发表于 2013-08-17 22:13 |显示全部楼层
本帖最后由 zhouzhenghui 于 2013-08-17 23:14 编辑

正如starwing指出的那样,修改后的版本虽然put没有问题,但实际上存在活锁,就是必须等待另外一方做完事情才可以继续,这一般来说是不好的,虽然在kernel类似的spinlock会这么做。比较好的无锁是在其他人没有结束必须先结束的事务操作时,帮助对方结束。

楼主所需代码大约如下:
  1. static struct lognode root = {0};
  2. static struct lognode* head = &root;
  3. static struct lognode* tail = &root;

  4. void lognode_put(struct lognode* node)
  5. {
  6.     struct lognode* last;
  7.     do {
  8.         last = tail;
  9.         struct lognode* before = __sync_val_compare_and_swap(&(last->next), NULL, node);
  10.         if (before != NULL)
  11.            __sync_val_compare_and_swap(&tail, last, before);
  12.     } while (before != NULL);

  13.     __sync_val_compare_and_swap(&tail, last, node);
  14. }

  15. struct lognode* lognode_get()
  16. {
  17.     struct lognode temp = {0};
  18.     tail = &temp;
  19.     wmb();

  20.     struct lognode* node = head->next;
  21.     head->next = NULL;
  22.    
  23.     if (__sync_val_compare_and_swap(&tail, &temp, head) == &temp)
  24.         if (temp.next) lognode_put(temp.next);
  25.     else      
  26.        head->next = temp.next;

  27.     return node;
  28. }
复制代码
此代码中的get只适合于单个消费者的情况,多个消费者留作作业了。

论坛徽章:
11
未羊
日期:2013-12-16 12:45:4615-16赛季CBA联赛之青岛
日期:2016-04-11 19:17:4715-16赛季CBA联赛之广夏
日期:2016-04-06 16:34:012015亚冠之卡尔希纳萨夫
日期:2015-11-10 10:04:522015亚冠之大阪钢巴
日期:2015-07-30 18:29:402015亚冠之城南
日期:2015-06-15 17:56:392015亚冠之卡尔希纳萨夫
日期:2015-05-15 15:19:272015亚冠之山东鲁能
日期:2015-05-14 12:38:13金牛座
日期:2014-12-04 15:34:06子鼠
日期:2014-10-16 13:40:4715-16赛季CBA联赛之八一
日期:2016-07-22 09:41:40
发表于 2013-08-18 00:03 |显示全部楼层
zhouzhenghui 发表于 2013-08-17 22:13
正如starwing指出的那样,修改后的版本虽然put没有问题,但实际上存在活锁,就是必须等待另外一方做完事情才 ...


你确定你的代码测试过吗,
  1. static struct lognode root = {0};
  2. static struct lognode* head = &root;
  3. static struct lognode* tail = &root;
  4. int nr_not_change_tail = 0;
  5. int up = 0, get = 0;


  6. void lognode_put(struct lognode* node)
  7. {
  8.     struct lognode* before;
  9.     struct lognode* last;
  10.     do {
  11.         last = tail;
  12.         before = __sync_val_compare_and_swap(&(last->next), NULL, node);
  13.         if (before != NULL)
  14.            __sync_val_compare_and_swap(&tail, last, before);
  15.     } while (before != NULL);

  16.     __sync_val_compare_and_swap(&tail, last, node);
  17. }

  18. struct lognode* lognode_get()
  19. {
  20.     struct lognode temp = {0};
  21.     tail = &temp;
  22.     wmb();

  23.     struct lognode* node = head->next;
  24.     head->next = NULL;

  25.     if (__sync_val_compare_and_swap(&tail, &temp, head) == &temp)
  26.         if (temp.next) lognode_put(temp.next);
  27.     else
  28.        head->next = temp.next;

  29.     return node;
  30. }

  31. #define nrp 1000000
  32. #define nrt 1

  33. static volatile int run = 0;
  34. static struct lognode* pool = NULL;
  35. static volatile int indx = 0;

  36. void* writer(void* any)
  37. {
  38.     while (run == 0) {
  39.         usleep(100);
  40.     }

  41.     for (int i = 0; i < nrp; ++i) {
  42.         int idx = __sync_fetch_and_add(&indx, 1);
  43.         struct lognode* nodep = &pool[idx];
  44.         nodep->next = NULL;
  45.         lognode_put(nodep);
  46.     }

  47.     __sync_sub_and_fetch(&run, 1);
  48.     return NULL;
  49. }

  50. static int fetched[1024 * 1024 * 32];
  51. static int idx = 0;

  52. int main(int argc, char** argv)
  53. {
  54.     int n = 0;
  55.     for (int i = 0; i < nrt; ++i) {
  56.         pthread_t tid;
  57.         if(0 == pthread_create(&tid, NULL, writer, NULL)) {
  58.             ++n;
  59.             pthread_detach(tid);
  60.         }
  61.     }

  62.     int nr = nrp * n;
  63.     pool = (struct lognode *) malloc(sizeof(struct lognode) * nr);
  64.     if (pool == 0) {
  65.         printf("memory\n");
  66.         return 0;
  67.     }
  68.     run = n;
  69.     printf("run = %d\n", run);

  70.     printf("%d writer, nr = %d\n", n, nr);
  71.     time_t t1 = time(0);

  72.     int fch = 0;
  73.     while (run > 0) {
  74. #if 1
  75.         struct lognode* nodep = lognode_get();
  76.         struct lognode* cur = nodep;
  77.         while (cur != NULL) {
  78.             //printf("%p feched\n", cur);
  79.             nodep = nodep->next;
  80.             cur = nodep;
  81.             ++fch;
  82.             --nr;
  83.         }

  84.         if (fch > 0) {
  85.             fetched[idx++] = fch;
  86.             fch = 0;
  87.         }
  88. #endif
  89.         time_t t2 = time(0);
  90.         if (t2 > t1 + 3) {
  91.             printf("%d %d\n", nr, indx);
  92.             t1 = t2;
  93.         }
  94.     }

  95.     struct lognode* nodep = lognode_get();
  96.     struct lognode* cur = nodep;
  97.     while (cur != NULL) {
  98.         nodep = nodep->next;
  99.         cur = nodep;
  100.         ++fch;
  101.         --nr;
  102.     }

  103.     if (fch > 0) {
  104.         fetched[idx++] = fch;
  105.     }
  106.     printf("idx = %d\n", idx);
  107.     printf("done %d, nr_not_change_tail = %d, up = %d, get %d\n", nr, nr_not_change_tail, up, get);
  108. return 0;
  109.     n = 0;
  110.     fch = 0;
  111.     nr = 0;
  112.     for (int i = 0; i < nrp; ++i) {
  113.         ++fch;
  114.         if (pool[i].next == NULL) {
  115.             if(fch < fetched[n]) {
  116.                 assert(0);
  117.             } else if (fch > fetched[n]) {
  118.                 if (nr < fch - fetched[n]) {
  119.                     nr = fch - fetched[n];
  120.                 }

  121.                 //printf("%d: %d, %d\n", n, fch, fetched[n]);
  122.             }
  123.             ++n;
  124.             fch = 0;
  125.         }
  126.     }
  127.     printf("%d get indeed, %d\n", n, nr);

  128.     cur = &pool[0];
  129.     for (int i = 1; i < nrp; ++i) {
  130.         ++fch;
  131.         if (cur->next != NULL) {
  132.             assert(cur->next == &pool[i]);
  133.             //printf("%d:%d\n", fetched[nr], fch);
  134.         }
  135.         cur = &pool[i];
  136.     }
  137.     printf("it's linked  indeed\n");
  138.     return 0;
  139. }
复制代码
运行结果:
run = 1
1 writer, nr = 1000000
idx = 238635
done 622448, nr_not_change_tail = 0, up = 0, get 0

凡是 done 不为  0 的, 一律存在 bug

论坛徽章:
0
发表于 2013-08-18 13:39 |显示全部楼层
本帖最后由 zhouzhenghui 于 2013-08-18 13:40 编辑

的确没有测试(BTW:楼主的代码也太混乱了吧)

理论上来说,这里必须使用动态的head和tail,如下:

static struct lognode* head;
static struct lognode* tail;

int main()
{
    head = malloc(sizeof(struct lognode));
    head->next = NULL;
    tail = head;
    ...
}

然后,get方法表现为:

struct lognode* lognode_get()
{
    if (!head->next) return head;

    struct lognode *node = head->next;

    head = malloc(sizeof(struct lognode));
    head->next = NULL;

    lognode_put(head);

    return node;
}

如果一定要使用静态的头部,那需要不稳定逻辑避过,并且效率会受到影响:

struct lognode* lognode_get()
{
    if (head == tail) return head;

    struct lognode *node = head->next;
    if (!node) return head;

    head->next = NULL;
    lognode_put(head);

    return node;
}

注意get链表以head结束,即循环终止条件是cur != head.

您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP