- 论坛徽章:
- 0
|
本帖最后由 ydfgic 于 2010-11-16 14:24 编辑
哥不知道哪根筋抽了,突然想把这样的代码改成无锁的。
do_with_lock()
{
lock();
/*
aha, I'm safe()
*/
unlock();
}
直接的起源是多线程里打log,对着printf还要加锁感到很刺眼。如果几十线程一起上,都得在这里给wait一下,实在是感觉不划算。
(其实我后来对着测试数据想想,如果程序没有要求到那么变态,这里效率低点也没什么)
正好要写一个log模块,不过当时没怎么想,觉得蛮好写的,就是原子写,然后处理缓冲区满了的时候的情况。
第一版很快写好了,用的是条件变量通知,唤醒等待flush 缓存的线程,继续写。很快发现不是那么简单,时间差可以导致很多微妙的情况,
比如:要等待条件变量的线程,还没来得及休眠,条件变量的通知就过来了,这就导致了这些线程要等下一次的flush唤醒。
后面想了想实在没法解决这个时间差,只有放弃条件变量通知,还是用等待原子变量。
........
经过多次的debug(很多次),终于写好了,再看看,有点复杂,不过也掌握了一些pattern和手法。
测试了一下,输入条件 30线程, 打印100000, buffer长度 409600
无锁版本:
time ./lockless_buffer 30 100000 409600
real 0m2.000s
user 0m12.363s
sys 0m3.029s
直接用mutex版本:
time ./lockless_buffer_lock 30 100000 409600
real 0m6.158s
user 0m5.129s
sys 0m27.524s
time ./lockless_buffer_lock 30 100000 409600
real 0m4.678s
user 0m4.981s
sys 0m27.193s
time ./lockless_buffer_lock 30 100000 409600
real 0m4.103s
user 0m5.111s
sys 0m24.762s
长时间运行:无锁版系统负载达到7,而有锁版的系统负载达到15。
相差还是比较大的,不过值不值得去优化就看情况了。
更新加更正:
printf是不用加锁的,我测试验证了。
但一般说来,printf的效率不高,不是带大缓存,会频繁的调write,我测试结果如下,但是稍差于mutex版本
time ./lockless_buffer 30 100000 409600
real 0m5.372s
user 0m3.792s
sys 0m37.504s
结论:
1.修改的无锁版本很复杂,效率确实有提高,但是复杂度不是一般的高。
2.如果是简单的打log,用printf足够了。
3.如果你要控制log文件大小或者按时间截取log,那就用mutex锁吧
这个例子是个抛砖引玉,不光是打log这个应用场景,将锁变成原子编程,尽量的避免锁冲突,对于对效率优先的一些应用意义很大:比如一个繁忙的任务队列
我再试试把我写的一个任务队列的锁去掉
lockless_buffer.h
- #ifndef _LOCKLESS_BUFFER_H
- #define _LOCKLESS_BUFFER_H
- typedef void * (* clear_buf_handle)(void *);
- typedef struct LL_buffer_t LL_buffer_t;
- #ifdef __cplusplus
- extern "C" {
- #endif
- int LL_buf_create(LL_buffer_t ** pbuf, size_t buf_len, clear_buf_handle clear_callback, void * callback_arg, int fd);
- int LL_buf_write(LL_buffer_t * pbuf, char * puts, size_t put_len);
- int LL_buf_flush(LL_buffer_t * pbuf); //call in callback_func
- int LL_buf_destory(LL_buffer_t * pbuf);
- #ifdef __cplusplus
- }
- #endif
- #endif
复制代码 lockless_buffer.c-
- #include<stdlib.h>
- #include <string.h>
- #include<pthread.h>
- #include<unistd.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <sched.h> //sched_yield
- #include"atomic.h"
- #include"lockless_buffer.h"
- #include<err.h>
- //gcc -g -Wall lockless_buffer.c -o lockless_buffer -lpthread
- //gcc -g -Wall lockless_buffer.c -o lockless_buffer_lock -lpthread -DUSE_LOCK
- struct LL_buffer_t{
- size_t buf_sz;
- int fd;
- volatile int flag_run;
- atomic_t offset_at;
- atomic_t working_at;
- atomic_t done_offset_at;
- atomic_t flag_at;
-
- clear_buf_handle clear_callback;
- void * cb_arg;
- char buf[0];
- };
- int LL_buf_create(LL_buffer_t ** ppbuf, size_t buf_len, clear_buf_handle clear_callback, void * cb_arg, int fd)
- {
- struct stat st;
- if(fd <= 0 || fstat(fd, &st) != 0 || !(st.st_mode & (S_IWUSR | S_IWGRP | S_IWOTH)))
- return -1;
- *ppbuf = (LL_buffer_t *)calloc(1, sizeof(LL_buffer_t) + buf_len - 1);
- if(! *ppbuf)
- return -1;
- (*ppbuf)->buf_sz = buf_len;
- (*ppbuf)->fd = fd;
- (*ppbuf)->clear_callback = clear_callback;
- (*ppbuf)->cb_arg = cb_arg != NULL ? cb_arg : *ppbuf;
- atomic_set(&(*ppbuf)->offset_at, 0);
- atomic_set(&(*ppbuf)->working_at, 0);
- atomic_set(&(*ppbuf)->done_offset_at, 0);
- atomic_set(&(*ppbuf)->flag_at, 0);
- (*ppbuf)->flag_run = 1;
- return 0;
- }
- int LL_buf_write(LL_buffer_t * pbuf, char * puts, size_t put_len)
- {
- int offset;
- if(unlikely(put_len > pbuf->buf_sz))
- return -1;
- #define TEST_OVER_LEN() ((atomic_read(&pbuf->offset_at) + put_len) > pbuf->buf_sz)
- while(pbuf->flag_run)
- {
- atomic_dec(&pbuf->working_at); //设置 临界区域1 等待 标志
- if(likely(!TEST_OVER_LEN() && //条件a
- atomic_read(&pbuf->flag_at) == 0 && //条件b , 临界区域2 等待 标志
- pbuf->buf_sz >= (offset = atomic_add_return(put_len, &pbuf->offset_at)))) //条件c, 不符合则表示发生改变临界条件的冲突
- { // 可以添加后在buf范围内
- //XXX 必须完成所有写工作才能,dump,要不会打乱写的操作,比如一个在后面的写完成了,而前面的写没完成,造成了空洞
- //(因为,dump是按照done_offset写的)
- //while(atomic_read(&pbuf->flag_at) > 0); //死锁
- //XXX 临界区域1
- memcpy(&pbuf->buf[offset - put_len], puts, put_len);
- atomic_add(put_len, &pbuf->done_offset_at);
- atomic_inc(&pbuf->working_at); //通知 临界区域1 完成
- return 0;
- }
- else
- {
- atomic_inc(&pbuf->working_at); //解除不符合条件的等待1 的标志
- if(!TEST_OVER_LEN())
- {
- //条件b,不符合等待2 条件
- while(atomic_read(&pbuf->flag_at) != 0)
- sched_yield();
- }
- else
- {
- //不符合条件a 的 和 临界区2 条件不符合
- if(0 == atomic_xchg(&pbuf->flag_at,1)) //设置临界区2 条件b 为假
- { //double check
- if(TEST_OVER_LEN())
- {
- //XXX 临界区域2
- //等待之前的copy操作完成 , 阻止了这样的情况:比如这里有个长字符串超界后,但是又可以写一个短字符串
- while(atomic_read(&pbuf->working_at) < 0); //等待 临界区域1 完成
- if(pbuf->flag_run == 0) //must
- break;
- //只有一个线程进入
- if(pbuf->clear_callback)
- pbuf->clear_callback(pbuf->cb_arg);
- else
- LL_buf_flush(pbuf);
- //set to begin
- atomic_set(&pbuf->done_offset_at, 0);
- atomic_set(&pbuf->offset_at, 0); //条件a OK
- }
- atomic_set(&pbuf->flag_at, 0); //通知 临界区域2 完成
- }
- else
- {
- while(TEST_OVER_LEN()) //等待条件a
- sched_yield(); //must
- }
- }
- }
- }
- return 0;
- #undef TEST_OVER_LEN
- }
- int LL_buf_flush(LL_buffer_t * pbuf)
- {
- return write(pbuf->fd, pbuf->buf, atomic_read(& pbuf->done_offset_at)) >= 0 ? 0 : -1;
- }
- int LL_buf_destory(LL_buffer_t * pbuf)
- {
- pbuf->flag_run = 0;
- while(atomic_read(&pbuf->working_at) != 0); //for safe
- if(atomic_read(& pbuf->done_offset_at) > 0)
- {
- write(pbuf->fd, pbuf->buf, atomic_read(& pbuf->done_offset_at));
- }
- free(pbuf);
- return 0;
- }
- #ifndef TEST
- #include<err.h>
- #include<fcntl.h>
- #include<stdio.h>
- void * flush_file(void * pvar)
- {
- LL_buf_flush((LL_buffer_t *)pvar);
- return 0;
- }
- #define THREAD_NR_MAX 100
- const char * ptext = "The quick brown fox jumps over the lazy dog";
- struct LL_buffer_t *lbuf ;
- int write_nr = 1000;
- int buf_len = 4096 * 4;
- pthread_t tid[THREAD_NR_MAX];
- int gfd;
- pthread_barrier_t barrier;
- void * thread_func(void * arg)
- {
- int id = *(int *)arg;
- // pthread_detach(pthread_self());
- //fprintf(stderr,"id:%d\t",id);
- pthread_barrier_wait(&barrier);
- char buf[256];
- char text[128];
- int i =0;
- strcpy(text,ptext);
- text[strlen(ptext) - id + 1] = 0;
- for(;i < write_nr; i++)
- {
- sprintf(buf,"thread %d, time %d:%s\n", id, i, text);
- LL_buf_write(lbuf, buf, strlen(buf));
- }
- //fprintf(stderr," id:%d,cmplete %d\n",id, i);
- return 0;
- }
- //use lock for test
- char *buf_for_lock;
- volatile int buf_offset;
- pthread_mutex_t mutex_for_lock = PTHREAD_MUTEX_INITIALIZER;
- void * thread_func_for_lock(void * arg)
- {
- // pthread_detach(pthread_self());
- pthread_barrier_wait(&barrier);
- int id = *(int *)arg;
- char buf[256];
- char text[128];
- int i =0, wlen;
- strcpy(text,ptext);
- text[strlen(ptext) - id + 1] = 0;
- for(;i < write_nr; i++)
- {
- sprintf(buf,"thread %d, time %d:%s\n", id, i, text);
- wlen = strlen(buf);
- pthread_mutex_lock(&mutex_for_lock);
- if((wlen + buf_offset) > buf_len)
- {
- //write in file
- write(gfd, buf_for_lock, buf_offset);
- buf_offset = 0;
- }
- memcpy(buf_for_lock + buf_offset, buf, wlen);
- buf_offset += wlen;
- pthread_mutex_unlock(&mutex_for_lock);
- }
- return 0;
- }
- int main(int c, char * s[])
- {
- int thr_id[THREAD_NR_MAX];
- int i, thr_nr;
- if(c != 4)
- {
- errx(-1, "usage: %s [thread_num] [write_line_num] [buf_length]", s[0]);
- }
- thr_nr = atoi(s[1]);
- write_nr = atoi(s[2]);
- buf_len = atoi(s[3]);
- buf_len = (buf_len + 4096) & ~(4096 -1);
- pthread_barrier_init(&barrier,NULL,thr_nr+1);
-
- if( 0 > (gfd = open("./test_llbuf.output", O_WRONLY | O_CREAT | O_TRUNC , S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)))
- err(-1,"open");
-
- #ifndef USE_LOCK
- if(0 > LL_buf_create(&lbuf, buf_len, flush_file, lbuf, gfd))
- err(-1,"create LLBuffer");
- #else
- buf_for_lock = (char*)malloc(buf_len);
- #endif
- for(i= 0; i< thr_nr; i++)
- {
- thr_id[i] = i;
- #ifndef USE_LOCK
- pthread_create(tid+i, NULL, thread_func, &thr_id[i]);
- #else
- pthread_create(tid+i, NULL, thread_func_for_lock, &thr_id[i]);
- #endif
- }
- pthread_barrier_wait(&barrier);
- for(i= 0; i< thr_nr; i++)
- {
- pthread_join(tid[i],NULL);
- // warnx("join thr:%d", i);
- }
- #ifndef USE_LOCK
- LL_buf_destory(lbuf);
- #else
- write(gfd, buf_for_lock, buf_offset);
- free(buf_for_lock);
- #endif
- return 0;
- }
- #endif
复制代码
lockless_buffer.tar.gz
(2.91 KB, 下载次数: 92)
奇怪atomic.h怎么是乱码,补上
atomic.h.zip
(2.55 KB, 下载次数: 93)
|
|