zylthinking 发表于 2017-04-13 10:00

本帖最后由 zylthinking 于 2017-04-13 10:01 编辑

回复 50# wlmqgzm

问题是在不分组映射的情况下, 我的在 多写一读 情况下是 wait_free 的, 甚至在一定意义上来说, 我的是多读多写情况下的 wait free;这都是不需要分组的.并不是我只说是 lock free 的, 就一定意味着仅仅是 lock free 的

而你说的却是 单读单写 情况下的 wait_free, 然后分成多个组来减少碰撞;用这个能说明什么呢






wlmqgzm 发表于 2017-04-13 21:04

本帖最后由 wlmqgzm 于 2017-04-13 21:15 编辑

回复 51# zylthinking

主要是觉得你这个库的思路是挺好的,但是不够方便, 也不是C++的,不够前沿,性能上也不好说,毕竟wait_free才是最快的,
可能我自己平时用的库更习惯吧, 给个范例,使用起来只需要定义好回调函数,不需要考虑线程等等复杂的东西, 仅仅push数据就好,其余全部是全自动执行的,适应任意对象的高性能数据处理
template<typename T>
classWait_free_mpsc_ios_queue
{
public:
    //   下面 vt_sp_ios 是预先定义好的Boost库多线程任务队列容器,ushort_ios_num_begin和 ushort_ios_num_end 指明了容器中多个生产者ios的范围, size_t_size_in是预分配的队列大小
    Wait_free_mpsc_ios_queue( const std::vector<std::shared_ptr<boost::asio::io_service> > &vt_sp_ios, const unsigned short ushort_ios_num_begin, const unsigned short ushort_ios_num_end, const size_t size_t_size_in );
   
    ~Wait_free_mpsc_ios_queue( void );

    // 定义下列一行,只需要push数据, 然后消费者会自动处理数据,
    void   set_ios_and_function_pop( const std::shared_ptr<boost::asio::io_service>&sp_ios_pop_in, const std::function<void(T&)>&function_pop_in );
    //定义下列一行, 当全部数据处理完毕后,消费者会自动调用function_after_pop_all_in
    void   set_function_after_pop_all( const std::function<void(void)>&function_after_pop_all_in );

    //当定义下列一项后,生产者还会自动处理消费者返回的结果
    void   set_function_push_callback( const std::function<void(T&)>&function_push_callback_in );

    bool   push_callback_empty( void ) const;//检查 全部 回调任务是否为空
    bool   push_callback_empty( unsigned short ushort_ios_num ) const;//检查 单个ios 回调任务是否为空

    boolpush( T& t_in, unsigned short ushort_ios_num );//生产者执行
    voidpush_util_success( T& t_in, unsigned short ushort_ios_num );

    bool   full( unsigned short ushort_ios_num ) const;
    bool   empty( void ) const;
    bool   empty( unsigned short ushort_ios_num ) const;
    size_t size( void ) const;   //队列中等待处理的数据数量
    size_t capacity( void ) const;
    bool   is_lock_free( void ) const;

    void   set_bool_fast_quit( bool bool_fast_quit );   //设置是否快速退出, 默认是必须等待队列空才析构
    unsigned long longget_push_count( void ) const;   //统计总共push进队列的数据总数

private:

zylthinking 发表于 2017-04-17 10:10

回复 52# wlmqgzm

C++的,不够前沿,性能上也不好说,毕竟wait_free才是最快的

这两句, 前一句就不说什么了; 我不前沿就不前沿呗

第二句你什么意思? 合着我一直说你推崇的那个只能在一个读一个写的情况下可以 wait free; 而我的自己认为多个写 一个读的情况下也是 wait free 压根就没看, 就闭眼睛重复念经了;

wlmqgzm 发表于 2017-04-18 00:52

本帖最后由 wlmqgzm 于 2017-04-18 01:46 编辑

要么楼主测试一下性能吧,跟boost lockfree_spsc_queue可以对比测试一下, 那个是wait_free的, 我自己测试过, 在主频3.2G的双核CPU下, 传送int整数,测试每秒push进队列2亿条记录, pop出2亿条记录.
我们公司内部的wait_free_spsc_queue大约是:   传送int整数,测试每秒push进队列3亿条记录, pop出3亿条记录.
既然楼主坚决认为自己是在实现wait_free, 建议也push/pop测试一下, 看有多强.

wait_free/lock_free 现在基本都已经形成套路了,各大公司都有成熟的模型,一般wait_free有要求, 就是push和pop互不干扰(必须要间隔超过64Byte, 否则会有false sharing问题, 一般有足够的空间和足够的数据量的话, 不会有此问题)
其实, 我大概看了代码, 初步判断楼主的实现不是wait_free的,
wait_free 的实现有严格的技术要求, 有很多类似 std::memory_order之类的很多东西, 还有很多的内存屏障,
在多数情况下, push代码所有访问的数据都是独立的, 不会被pop影响,
类似的pop也一样,
本来lock_free就够难实现的了, 更何况wait_free就更难了, 更难的是 多生产者 + 消费者抢 一个队列, 还不能够有等待(wait), 我明明看到好多CAS spin在等待的(wait)
我目前还没有看到类似楼主这样的wait_free结构, 学识浅薄, 确实无法理解这种wait_free, ......

请务必测试一下性能, 看比标准版的wait_free快, 还是慢???    :lol

lxyscls 发表于 2017-04-18 07:07

回复 54# wlmqgzm

这,xeon E5 2.20G,接近2亿,同一个Core上的两个Hyper-thread,不同Core也差不多,跨Numa的核就掉到一半了。抄这个论文的:《Single-Producer/ Single-Consumer Queueson Shared Cache Multi-Core Systems》

#include <cstddef>
#include <atomic>

template <typename T, int capacity> class spsc_queue {
public:
    spsc_queue(const T &val) : head(0), tail(0), invalid_val(val) {
      for (int i = 0; i < capacity; i++) {
            buf = invalid_val;
      }
    }
    spsc_queue(const spsc_queue &) = delete;
    spsc_queue &operator=(const spsc_queue &) = delete;

    bool enqueue(const T &data) {
      if (buf.load(std::memory_order_acquire) == invalid_val) {
            buf.store(data, std::memory_order_release);
            tail = (tail + 1) % capacity;
            return true;
      }
      return false;
    }

    bool dequeue(T &data) {
      if (buf.load(std::memory_order_acquire) != invalid_val) {
            data = buf.load(std::memory_order_relaxed);
            buf.store(invalid_val, std::memory_order_release);
            head = (head + 1) % capacity;
            return true;
      }
      return false;
    }

private:
    static constexpr size_t CACHELINE = 64;

    alignas(CACHELINE) unsigned int head;
    alignas(CACHELINE) unsigned int tail;

    char padding;

    std::atomic<T> buf;
    T invalid_val;
};



zylthinking 发表于 2017-04-18 11:49

本帖最后由 zylthinking 于 2017-04-18 12:04 编辑

wlmqgzm 发表于 2017-04-18 00:52
要么楼主测试一下性能吧,跟boost lockfree_spsc_queue可以对比测试一下, 那个是wait_free的, 我自己测试 ...
上边是你推崇的 boost , 下边是我的 lkf; 测试的是 1读1写的; 你自己看结果, 单线程下是我的占优势;
多线程下, 发现 boost 这玩意居然不是线程安全的, 我说的线程安全意思是多个读多个写, 它文档中的线程安全很可能是一个读一个写彼此安全 。
多线程下看时间是它占点优势, 但不大; 但考虑到它结果都是错的, 你自己明白是什么意思boost 是 apt install 下载的, 优化选项可能没有优化到最好,姑且存疑


单线程 boost:
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
1 writer, 1 reader, nr = 100000
write 17, read 17, nr = 0
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
1 writer, 1 reader, nr = 100000
write 13, read 13, nr = 0
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
1 writer, 1 reader, nr = 100000
write 14, read 14, nr = 0
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
1 writer, 1 reader, nr = 100000
write 12, read 12, nr = 0

单线程 lkf:
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
1 writer, 1 reader, nr = 100000
write 11, read 11, nr = 0
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
1 writer, 1 reader, nr = 100000
write 12, read 12, nr = 0
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
1 writer, 1 reader, nr = 100000
write 11, read 11, nr = 0
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
1 writer, 1 reader, nr = 100000
write 8, read 8, nr = 0



boost多线程 50 读, 2个写, 只看速度, 不管数据正确性

zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
50 writer, 2 reader, nr = 5000000
write 243, read 311
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
50 writer, 2 reader, nr = 5000000
write 248, read 176
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
50 writer, 2 reader, nr = 5000000
write 250, read 305
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
50 writer, 2 reader, nr = 5000000
write 252, read 298
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
50 writer, 2 reader, nr = 5000000
write 246, read 279

boost多线程 500 读, 2个写, 只看速度, 不管数据正确性

zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
500 writer, 2 reader, nr = 50000000
write 2404, read 2475
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
500 writer, 2 reader, nr = 50000000
write 2477, read 2554
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
500 writer, 2 reader, nr = 50000000
write 2520, read 2328
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
500 writer, 2 reader, nr = 50000000
write 2474, read 2508
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
500 writer, 2 reader, nr = 50000000
write 2508, read 2528



lkf多线程 50 读, 2个写, 只看速度, 最终数据是正确的


zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
50 writer, 2 reader, nr = 5000000
write 305, read 343
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
50 writer, 2 reader, nr = 5000000
write 355, read 356
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
50 writer, 2 reader, nr = 5000000
write 292, read 388
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
50 writer, 2 reader, nr = 5000000
write 323, read 364
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
50 writer, 2 reader, nr = 5000000
write 316, read 373


lkf多线程 500 读, 2个写, 只看速度, 最终数据是正确的

zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
500 writer, 2 reader, nr = 50000000
write 2672, read 3622
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
500 writer, 2 reader, nr = 50000000
write 2589, read 3986
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
500 writer, 2 reader, nr = 50000000
write 2664, read 3614
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
500 writer, 2 reader, nr = 50000000
write 2709, read 3611
zylthinking@linux:~$ g++ c.cpp now.c -lpthread; ./a.out
run = 0
500 writer, 2 reader, nr = 50000000
write 2638, read 3717





zylthinking 发表于 2017-04-18 11:51

zylthinking 发表于 2017-04-18 11:49
上边是你推崇的 boost , 下边是我的 lkf; 测试的是 1读1写的; 你自己看结果, 单线程下是我的占优势;...
测试代码:


#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
#include <boost/lockfree/spsc_queue.hpp>

#include "lkf.h"
static LKF_LIST(list);

#define nrp 100000
#define nrt 1
uint32_t now();

#define uselkf 0
int nnnnn = 0;
struct lkf_node** node_array = NULL;

static int run = 0;
static struct lkf_node* pool = NULL;
static int indx = 0;
static int nr = 0;
static uint32_t t1;

boost::lockfree::spsc_queue<lkf_node*, boost::lockfree::capacity<50000000> ,
                           boost::lockfree::fixed_sized<true> > spsc_queue;
void* writer(void* any)
{
    int* intp = (int *) any;
    while (run == 0) {
      usleep(100);
    }
#if uselkf
    for (int i = 0; i < nrp; ++i) {
      int idx = __sync_fetch_and_add(&indx, 1);
      struct lkf_node* nodep = &pool;
      lkf_node_put(&list, nodep);
    }
#else
    for (int i = 0; i < nrp; ++i) {
      int idx = __sync_fetch_and_add(&indx, 1);
      struct lkf_node* nodep = &pool;
      spsc_queue.push(nodep);
    }
#endif

    __sync_sub_and_fetch(&run, 1);
    intp = (int) (now() - t1);
    return NULL;
}

void* reader(void* any)
{
    int* intp = (int *) any;
    while (run == 0) {
      usleep(100);
    }
#if uselkf
    while (nr > 0) {
      struct lkf_node* nodep = lkf_node_get(&list);
      if (nodep == NULL) {
            continue;
      }

      __sync_sub_and_fetch(&nr, 1);
      struct lkf_node* cur = NULL;
      do {
            cur = lkf_node_next(nodep);
            if (cur == nodep) {
                break;
            }

            if (cur != NULL) {
                __sync_sub_and_fetch(&nr, 1);
            }
      } while (1);
    }
#else
    while (nr > 0) {
      int n = (int) spsc_queue.pop(node_array, nnnnn);
      __sync_sub_and_fetch(&nr, n);
    }
#endif
    intp = (int) (now() - t1);
    return NULL;
}

int main(int argc, char** argv)
{
    int n = 0;
    int used1 = {0};
    for (int i = 0; i < nrt; ++i) {
      pthread_t tid;
      if(0 == pthread_create(&tid, NULL, writer, &used1)) {
            ++n;
            pthread_detach(tid);
      }
    }

    nr = nrp * n;
#if !uselkf
    nnnnn = nr;
    node_array = (struct lkf_node**) malloc(sizeof(struct lkf_node*) * nr);
    if (node_array == 0) {
      printf("memory\n");
      return 0;
    }
#endif

    pool = (struct lkf_node *) malloc(sizeof(struct lkf_node) * nr);
    if (pool == 0) {
      printf("memory\n");
      return 0;
    }

    int readers = 0;
    int used2 = {0};
    for (int i = 0; i < 1; ++i) {
      pthread_t tid;
      if(0 == pthread_create(&tid, NULL, reader, &used2)) {
            ++readers;
            pthread_detach(tid);
      }
    }

    printf("run = %d\n", run);
    printf("%d writer, %d reader, nr = %d\n", n, readers, nr);

    t1 = now();
    run = n;

    sleep(10);
    int max1 = 0, max2 = 0;
    for (int i = 0; i < n; ++i) {
      if (used1 > max1) {
            max1 = used1;
      }
    }

    for (int i = 0; i < readers; ++i) {
      if (used2 > max2) {
            max2 = used2;
      }
    }

    printf("write %d, read %d, nr = %d\n", max1, max2, nr);
    return 0;
}

zylthinking 发表于 2017-04-18 11:58

本帖最后由 zylthinking 于 2017-04-18 12:25 编辑

wlmqgzm 发表于 2017-04-18 00:52
要么楼主测试一下性能吧,跟boost lockfree_spsc_queue可以对比测试一下, 那个是wait_free的, 我自己测试 ...编辑掉, 不吵架

folklore 发表于 2017-04-18 12:25

第一次听到无锁读***写***队例, 就觉得很神奇。怎么可能无锁呢??????
想啊想, 还是觉得怎么可能无锁呢???

看了楼主的代码, 还是觉得, 怎么可能远锁呢????
:lol:lol:lol:lol

zylthinking 发表于 2017-04-18 12:30

folklore 发表于 2017-04-18 12:25
第一次听到无锁读***写***队例, 就觉得很神奇。怎么可能无锁呢??????
想啊想, 还是觉得怎么可能无 ...



没有真正无锁的代码; 这前提下, 更多的人在意的是函数的名称中是否有 lock 或 wait 的字眼
页: 1 2 3 4 5 [6] 7 8 9 10
查看完整版本: 多线程读写无锁链表: 之前有没有相同的实现? (更新)