zylthinking 发表于 2013-08-21 10:33

多线程读写无锁链表: 之前有没有相同的实现? (更新)

本帖最后由 zylthinking 于 2017-04-18 12:22 编辑

再更新一次? 貌似更新在这里也没什么意义, 包含一个 proc_context 的 bugfix, 另外在 linux 下增加了两个同步函数

#ifdef __linux__
#include <syscall.h>
#include <unistd.h>

static inline void lkf_node_put_wake(struct lkf_list* list, struct lkf_node* node)
{
    node->next = NULL;
    struct lkf_node** ptr = __sync_lock_test_and_set(&(list->tail), &(node->next));
    if (ptr == &list->root.next) {
      while (-1 == syscall(__NR_futex, ptr, FUTEX_WAKE, 1, NULL, NULL, 0));
    }
    *ptr = node;
}

static inline struct lkf_node* lkf_node_get_wait(struct lkf_list* list)
{
    struct lkf_node* ptr = __sync_lock_test_and_set(&(list->root.next), NULL);
    while (ptr == NULL) {
      syscall(__NR_futex, &list->root.next, FUTEX_WAIT, NULL, NULL, NULL, 0);
      ptr = __sync_lock_test_and_set(&(list->root.next), NULL);
    };

    struct lkf_node** last = __sync_lock_test_and_set(&(list->tail), &(list->root.next));
    *last = ptr;
    return *last;
}
#endif

#if 1
typedef struct proc_context {
    struct lkf_list list;
    int stat;
} proc_context;

static int proc_enter(struct proc_context* ctx, struct lkf_node* node)
{
    lkf_node_put(&ctx->list, node);
    __sync_synchronize();
    int n = __sync_bool_compare_and_swap(&ctx->stat, 0, 1);
    if (n == 0) {
      return -1;
    }
    return 0;
}

static int proc_leave(struct proc_context* ctx)
{
    ctx->stat = 0;
    __sync_synchronize();
    if (ctx->list.tail == &ctx->list.root.next) {
      return 0;
    }

    int n = __sync_bool_compare_and_swap(&ctx->stat, 0, 1);
    if (n == 0) {
      return 0;
    }
    return -1;
}
#endif




更新: 增加了一个基于 lkf 的 proc_context, 使用场景: 当多个线程拿到同一个实体的数据并进行处理时, 先检查是否有其他线程已经在处理该实体数据, 若是, 将数据移交给该线程, 自身退出, 否则, 亲自处理。
还是希望看得懂的哥们仔细找找是否存在漏洞, 这玩意真是太难想了, 翻来覆去, 总是没有足够自信说没有 bugstruct proc_context {
    struct lkf_list list;
    int stat;
};

static int proc_enter(struct proc_context* ctx, struct lkf_node* node)
{
    lkf_node_put(&ctx->list, node);
    int n = __sync_bool_compare_and_swap(&ctx->stat, 0, 1);
    if (n == 0) {
      return -1;
    }
    return 0;
}

static int proc_leave(struct proc_context* ctx)
{
    ctx->stat = 0;
    if (ctx->list.tail == &ctx->list.root.next) {
      return 0;
    }

    int n = __sync_bool_compare_and_swap(&ctx->stat, 0, 1);
    if (n == 0) {
      return 0;
    }
    return -1;
}
写了一个多线程读多线程写的无锁链表, 完全是自己的思路, 但不知道之前有没有其他人已经提出来过了, 和 MS Queue 比, 应该好点吧, 自以为。
github 地址: https://github.com/zylthinking/lkf/blob/master/lkf.h, 上面有个测试程序, 已经使用 500 读线程 500 写线程 共 1亿次操作测试过了很多次, 没有出现错误。
相关帖子:http://bbs.chinaunix.net/thread-4094984-1-2.html#ifndef lkf_h
#define lkf_h

struct lkf_node {
    struct lkf_node* next;
};

struct lkf_list {
    struct lkf_node root;
    struct lkf_node** tail;
};


#define LKF_INIT(name) {.root = {NULL}, .tail = &(name.root.next)}
#define LKF_LIST(name) \
struct lkf_list name = LKF_INIT(name)

#define INIT_LKF(name) \
do { \
    name->root.next = NULL; \
    name->tail = &(name->root.next); \
} while (0)

static inline void lkf_node_put(struct lkf_list* list, struct lkf_node* node)
{
    struct lkf_node** ptr = __sync_lock_test_and_set(&(list->tail), &(node->next));
    *ptr = node;
}

static inline struct lkf_node* lkf_node_get_one(struct lkf_list* list)
{
    struct lkf_node* head = __sync_lock_test_and_set(&(list->root.next), NULL);
    if (head == NULL) {
      return NULL;
    }

    struct lkf_node* next = head->next;
    if (next != NULL) {
      list->root.next = next;
      head->next = head;
      return head;
    }

    int b = __sync_bool_compare_and_swap(&(list->tail), &(head->next), &(list->root.next));
    if (b) {
      head->next = head;
      return head;
    }

    list->root.next = head;
    return NULL;
}

static inline struct lkf_node* lkf_node_get(struct lkf_list* list)
{
    struct lkf_node* ptr = __sync_lock_test_and_set(&(list->root.next), NULL);
    if (ptr == NULL) {
      return NULL;
    }

    struct lkf_node** last = __sync_lock_test_and_set(&(list->tail), &(list->root.next));
    *last = ptr;
    return *last;
}

static inline struct lkf_node* lkf_node_next(struct lkf_node* node)
{
    struct lkf_node* ptr = node->next;
    if (ptr == NULL || ptr->next == NULL) {
      return NULL;
    }

    if (ptr == node) {
      return ptr;
    }

    node->next = ptr->next;
    ptr->next = NULL;
    return ptr;
}

#endif

cokeboL 发表于 2013-08-21 13:45

顶一下               

cxytz01 发表于 2013-08-21 15:06

本帖最后由 cxytz01 于 2013-08-21 15:26 编辑

CAS是吧?...
打击楼主一下,楼主不是第一个。
不过能耐心写出来也很厉害。

zylthinking 发表于 2013-08-21 16:51

cxytz01 发表于 2013-08-21 15:06 static/image/common/back.gif
CAS是吧?...
打击楼主一下,楼主不是第一个。
不过能耐心写出来也很厉害。

肯定不是第一个写无锁链表的, 但使用这个思路的是不是第一个呢? 我找了很多资料。 貌似没有相似的思路

zhouzhenghui 发表于 2013-08-21 19:35

粗略一看,楼主写的不是真正意义无锁队列,从put就可以看出,任何一个操作暂停(线程挂了),链表都会破坏,get也比较可笑,竟然明明队列中有数据也可能读不出来,一致性得不到保证,也就是说只满足特定应用场景。next操作很可能存在冲突。
BTW,链表的接口先搞搞好。

zylthinking 发表于 2013-08-21 19:41

本帖最后由 zylthinking 于 2013-08-21 19:51 编辑

zhouzhenghui 发表于 2013-08-21 19:35 static/image/common/back.gif
粗略一看,楼主写的不是真正意义无锁队列,从put就可以看出,任何一个操作暂停(线程挂了),链表都会破坏, ...

你先把你写的那个 bug 找出来吧, 好歹我这个跑了好多遍没出错, 你那个一跑就挂, 没一次对的好不好。
而且, 似乎就是 ms queue 的翻版? ms queue 怎么做你就照本宣科?

zylthinking 发表于 2013-08-21 19:50

zhouzhenghui 发表于 2013-08-21 19:35 static/image/common/back.gif
粗略一看,楼主写的不是真正意义无锁队列,从put就可以看出,任何一个操作暂停(线程挂了),链表都会破坏, ...

,竟然明明队列中有数据也可能读不出来,一致性得不到保证,也就是说只满足特定应用场景。next操作很可能存在冲突

这个你得解释解释, 什么叫有数据读不出来, 看你怎么定义数据准备好了, 我的定义就是 1. 如果 A->B->NULL 则认为 A 准备好了, B2) 如果在取得同时发现没有人马上使用 B->next, 则认为 B准备好了, 否则, B认为没准备好, 返回NULL 下次再取; 一致性是什么意思, 难道不是读出 A 后要么读不出来, 要读就读出 B来么, 难道 A, B 之间可能返回空就叫做一致性得不到保证??? 你得哪个难道一定返回数据???

next 操作冲突看你怎么用, 本来 next 操作的含义就是为一个单线程准备的, 典型用法:

ptr = get();
while (ptr != ptr->next()) {
    if (ptr == NULL) 下条数据尚未准备好, 爱干嘛干嘛
    else ptr 已经指向下条数据
}

所有线程冲突的地方都是 get 在负责好不好。



zhouzhenghui 发表于 2013-08-21 19:54

之前的代码只是针对特定问题给出的,有没有错或者错在哪里我的确不知道,但那并不重要。更别说我也给出真正能保证正确的做法和建议。
至于所谓相似,基本的技巧和理念都是相通的,你没有理解什么叫无锁队列,给出来只是在线程都必须运行,并且循环反复重试场景下的才工作的实现,跟真正意义上的无锁相差甚远。我是惜才才提醒几句,建议去掉浮躁才能真正的做好技术。

zylthinking 发表于 2013-08-21 20:27

zhouzhenghui 发表于 2013-08-21 19:54 static/image/common/back.gif
之前的代码只是针对特定问题给出的,有没有错或者错在哪里我的确不知道,但那并不重要。更别说我也给出真正 ...

所有线程都必须运行, 意思不过是说不能莫名其妙被取消掉, 而且 必须不能在
    struct lkf_node** ptr = __sync_lock_test_and_set(&(list->tail), &(node->next));
    *ptr = node;
这两行中间被莫名其妙取消掉才行, 否则没有任何问题。 我的实现是有这问题, 但我不认为这是个很大问题, 我不认为这两句之间会有什么可以说得通的原因导致这个线程会被取消, 信号之类的不再考虑之内, 用户 pthread_kill 不在考虑之内

那么什么叫无锁队列,谁在规定什么叫无锁队列?

另外, 惜才啊, 留作业啊什么的拜托别说的那么顺口, 我没有随便拜师的毛病, 之前你说的时候就觉得怎么这么装, 代码一测, 还跑不通, 到现在你都没解决, 看样子也不想找到原因了, 就这样 还建议去掉浮躁才能真正的做好技术???

zylthinking 发表于 2013-08-21 20:29

zhouzhenghui 发表于 2013-08-21 19:54 static/image/common/back.gif
至于所谓相似,基本的技巧和理念都是相通的,你没有理解什么叫无锁队列.

我真不想故意曲解, 但真的想恶意引申一下, 你是在说, 真正的无锁队列就是基本技巧和理念和你的相通的才能叫是不是?
页: [1] 2 3 4 5 6 7 8 9 10
查看完整版本: 多线程读写无锁链表: 之前有没有相同的实现? (更新)