免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
12下一页
最近访问板块 发新帖
查看: 12025 | 回复: 10
打印 上一主题 下一主题

python居然没有好用的memcache客户端! [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2009-08-07 19:57 |只看该作者 |倒序浏览
测试了python-memcache python-libmemcached  pylibmc
发现没有一个能够在多线程下并发和memcached通讯。

对pylibmc的测试结论如下:
root@ubuntu:~# python benchmark.py
Testing single thread ...
[3.3634729385375977]  单线程读写5000次花费时间
Testing muti thread ...
[34.163991928100586]  10个线程并发读写5000次花费时间,正好是10倍,看来没有并发访问

代码benchmark.py
通过将第一行修改为import memcache as pylibmc等就可以测试其他客户端
结论是python-memcache和python-libmemcached情况和pylibmc一样!

郁闷了,
难道大家用memcache时都不会用到多线程?


import  pylibmc
import threading

test_servers = ["127.0.0.1:11211"]

def simple_set(mc):
        mc.set("foo", "fooAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")
        mc.set("bar", "barAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")


def simple_get(mc):
        mc.get("foo")
        mc.get("bar")

def test_single_thread(n):
        mc = pylibmc.Client(test_servers)
        for x in xrange(n):
                simple_set(mc)
                simple_get(mc)
        mc.disconnect_all()

def __test(mc,n):
        for x in xrange(n):
                simple_set(mc)
                simple_get(mc)

def test_muti_thread(tn,n):
        mcs=[]
        for x in xrange(tn):
                mc = pylibmc.Client(test_servers)
                mcs.append(mc)

        threads=[]
        for x in xrange(tn):
                t=threading.Thread(target=__test,args=(mcs[x],n))
                threads.append(t)

        for x in xrange(tn):
                threads[x].start()
        for x in xrange(tn):
                threads[x].join()
        for x in xrange(tn):
                mcs[x].disconnect_all()



if __name__ == '__main__':
        import timeit
        print "Testing single thread ..."
        t=timeit.Timer('b.test_single_thread(5000);','import benchmark as b' )
        print t.repeat(1,1)

        print "Testing muti thread ..."
        t=timeit.Timer('b.test_muti_thread(10,5000);','import benchmark as b' )
        print t.repeat(1,1)

论坛徽章:
0
2 [报告]
发表于 2009-08-07 19:58 |只看该作者
python cmemcache模块,发现已经太老没有更新了,和现在的memcahced协议已经不兼容了。

论坛徽章:
0
3 [报告]
发表于 2009-08-07 22:19 |只看该作者
自己写一个扩展去调用libmemcached算了,效率一定高。

论坛徽章:
0
4 [报告]
发表于 2009-08-08 14:48 |只看该作者
自己封装了最新libmemcached中的libmemcacheutil的libmemcache_pool连接池到python,还是不行
看来是python的GIL导致的问题,线程无法真正并发。

root@ubuntu:/home/ubuntu# python benchmark.py
Testing single thread ...
[6.0042738914489746]
Testing muti thread ...
[60.949532032012939]
Testing thread pool ...
[63.305776834487915]
Testing thread and single conn ...
[58.841603994369507]
root@ubuntu:/home/ubuntu# cat benchmark.py
import  cmemcached as pylibmc
import threading

test_servers = ["127.0.0.1:11211"]

def simple_set(mc):
        mc.set("foo", "fooAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")
        mc.set("bar", "barAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")


def simple_get(mc):
        mc.get("foo")
        mc.get("bar")

def test_single_thread(n):
        mc = pylibmc.Client(test_servers)
        for x in xrange(n):
                simple_set(mc)
                simple_get(mc)
        #mc.disconnect_all()

def __test(mc,n):
        for x in xrange(n):
                simple_set(mc)
                simple_get(mc)
def test_pool_thread(tn,n):
        mc = pylibmc.Client(test_servers)
        mc.entry_pool_mode(10,10)

        threads=[]
        for x in xrange(tn):
                t=threading.Thread(target=__test,args=(mc,n))
                threads.append(t)
        for x in xrange(tn):
                threads[x].start()
        for x in xrange(tn):
                threads[x].join()


def test_muti_thread(tn,n):
        mcs=[]
        for x in xrange(tn):
                mc = pylibmc.Client(test_servers)
                mcs.append(mc)

        threads=[]
        for x in xrange(tn):
                t=threading.Thread(target=__test,args=(mcs[x],n))
                threads.append(t)

        for x in xrange(tn):
                threads[x].start()
        for x in xrange(tn):
                threads[x].join()

def test_single_conn_thread(tn,n):
        mc = pylibmc.Client(test_servers)
        threads=[]
        for x in xrange(tn):
                t=threading.Thread(target=__test,args=(mc,n))
                threads.append(t)
        for x in xrange(tn):
                threads[x].start()
        for x in xrange(tn):
                threads[x].join()


if __name__ == '__main__':
        import timeit
        print "Testing single thread ..."
        t=timeit.Timer('b.test_single_thread(5000);','import benchmark as b' )
        print t.repeat(1,1)

        print "Testing muti thread ..."
        t=timeit.Timer('b.test_muti_thread(10,5000);','import benchmark as b' )
        print t.repeat(1,1)

        print "Testing thread pool ..."
        t=timeit.Timer('b.test_pool_thread(10,5000);','import benchmark as b' )
        print t.repeat(1,1)

        print "Testing thread and single conn ..."
        t=timeit.Timer('b.test_single_conn_thread(10,5000);','import benchmark as b' )
        print t.repeat(1,1)




对python-libmemcached 0.13.1进行了修改,封装了libmemcache的pool功能,修改后diff结果如下:
ubuntu@ubuntu:~/a$ diff python-libmemcached-0.13.1/cmemcached.pyx  ../python-libmemcached-0.13.1/cmemcached.pyx
144a145,152
> cdef extern from "memcached_pool.h":
>       struct memcached_pool_st:
>               pass
>       memcached_pool_st *memcached_pool_create(memcached_st* ptr, uint32_t initial,uint32_t max)
>       memcached_st* memcached_pool_destroy(memcached_pool_st* ptr)
>       memcached_st* memcached_pool_pop(memcached_pool_st* ptr, int block, memcached_return* rc)
>       memcached_return memcached_pool_push(memcached_pool_st* ptr, memcached_st* mmc)
>
198a207
>       cdef memcached_pool_st * pool
216a226
>               self.pool = NULL
237a248,257
>       def entry_pool_mode(self,s,max):
>               if not self.pool :
>                       return -1
>               if s <= 1 :
>                       return -1
>               if  max <=s :
>                       max = s
>               self.pool=memcached_pool_create(self.mc, s,max)
>               return 1
>
251a272,273
>               if self.pool:
>                       memcached_pool_destroy(self.pool)
258a281
>               cdef memcached_st * mc
274c297,302
<               retval = memcached_set(self.mc, c_key, key_len, c_val, bytes, time, flags)
---
>               if self.pool :
>                       mc=memcached_pool_pop(self.pool, 1, &retval)
>                       retval = memcached_set(mc, c_key, key_len, c_val, bytes, time, flags)
>                       memcached_pool_push(self.pool, mc)
>               else:
>                       retval = memcached_set(self.mc, c_key, key_len, c_val, bytes, time, flags)
339a368
>               cdef memcached_st * mc
346c375,381
<               c_val = memcached_get(self.mc, c_key, key_len, &bytes, &flags, &rc)
---
>
>               if self.pool :
>                       mc=memcached_pool_pop(self.pool, 1, &rc)
>                       c_val = memcached_get(mc, c_key, key_len, &bytes, &flags, &rc)
>                       memcached_pool_push(self.pool, mc)
>               else:
>                       c_val = memcached_get(self.mc, c_key, key_len, &bytes, &flags, &rc)
ubuntu@ubuntu:~/a$ diff python-libmemcached-0.13.1/setup.py  ../python-libmemcached-0.13.1/setup.py
14c14
<             libraries=['memcached'],
---
>             libraries=['memcached','memcachedutil'],

论坛徽章:
0
5 [报告]
发表于 2009-08-09 16:22 |只看该作者

回复 #4 watercloud 的帖子

一直很奇怪这个GIL问题,Java一样是用字节码和虚拟机,一样要通过引用计数实现自动内存管理。

为什么Java一出来就支持真正的线程并发执行,而Python到3.1版还是没解决这个问题。

[ 本帖最后由 gnujava 于 2009-8-9 16:25 编辑 ]

论坛徽章:
0
6 [报告]
发表于 2009-08-11 11:50 |只看该作者
"10个线程并发读写5000次花费时间,正好是10倍,看来没有并发访问"
如果你机器是单核的,那10个线程用时10倍也是正常啊。

“自己封装了最新libmemcached中的libmemcacheutil的libmemcache_pool连接池到python,还是不行
看来是python的GIL导致的问题,线程无法真正并发。”
libmemcache_pool这个只是个连接池,又不是线程池。
python的线程是真的系统线程,试下在所有get和set前后释放和获取GIL,这样扩展模块就能真的并发,下面代码供参考:
cdef extern from "Python.h":
    ctypedef struct PyThreadState
    cdef extern PyThreadState *PyEval_SaveThread()
    cdef extern void PyEval_RestoreThread(PyThreadState*)


        def get(self, key):
                cdef char *c_key
                cdef Py_ssize_t key_len
                cdef uint32_t flags
                cdef size_t bytes
                cdef memcached_return rc
                cdef char * c_val
    cdef PyThreadState *_save

                PyString_AsStringAndSize(key, &c_key, &key_len)

                if key_len > 250:
                        return None

    _save = PyEval_SaveThread()
                c_val = memcached_get(self.mc, c_key, key_len, &bytes, &flags, &rc)
    PyEval_RestoreThread(_save)

“为什么Java一出来就支持真正的线程并发执行,而Python到3.1版还是没解决这个问题。”
我的理解java不是动态语言,实现起来当然也不同,而且python老大目的只在推进python语言的发展,对于这些难题,留给其他人解决吧(如:http://code.google.com/p/unladen-swallow/ 就想去掉GIL)

论坛徽章:
0
7 [报告]
发表于 2009-08-11 12:57 |只看该作者
Py不是万金油,啥都行的。呵呵。

论坛徽章:
0
8 [报告]
发表于 2009-08-11 14:28 |只看该作者
原帖由 seewind 于 2009-8-11 11:50 发表
"10个线程并发读写5000次花费时间,正好是10倍,看来没有并发访问"
如果你机器是单核的,那10个线程用时10倍也是正常啊。

“自己封装了最新libmemcached中的libmemcacheutil的libmemcache_pool连接池到pyth ...


我机器是4核的
即使单核也不正常呀,10个线程如果能并发,那么花费的时间和单线程花费的时间应该是1样的!

libmemcache_pool中我创建10个pool,对应我创建10个线程来使用pool,这样应该是并发了吧?

按照你的指点我改了代码
  387             _save = PyEval_SaveThread()
  388             c_val = memcached_get(mc, c_key, key_len, &bytes, &flags, &rc)
  389             PyEval_RestoreThread(_save)

可能我太愚笨了,这导致了python执行测试程序时崩溃了
Program received signal SIGPIPE, Broken pipe.
[Switching to Thread 0x8d2b0b90 (LWP 7225)]
0xb7fcc410 in __kernel_vsyscall ()
(gdb) bt
#0  0xb7fcc410 in __kernel_vsyscall ()
#1  0xb7fb791b in write () from /lib/tls/i686/cmov/libpthread.so.0
#2  0xb7cf0c25 in io_flush (ptr=0x8501530, error=0x8d2af4e at memcached_io.c:377
#3  0xb7cf0e76 in memcached_io_write (ptr=0x8501530, buffer=0xb7cf9b8d, length=0, with_flush=1 '\001') at memcached_io.c:241
#4  0xb7cf414e in memcached_set (ptr=0x84dfbc0, key=0xb7dc4894 "bar", key_length=3,
    value=0x81aa2a4 "bar", 'A' <repeats 187 times>, "BBBBBBBBBB"..., value_length=527, expiration=0, flags=0)
    at memcached_storage.c:160
#5  0xb7d0100f in __pyx_f_10cmemcached_6Client_set (__pyx_v_self=0xb7d3a39c, __pyx_args=0x85367ac, __pyx_kwds=0x0)
    at cmemcached.c:714
#6  0x080c9ab3 in PyEval_EvalFrameEx ()
#7  0x080c96e5 in PyEval_EvalFrameEx ()
#8  0x080cb1f7 in PyEval_EvalCodeEx ()
#9  0x081136b6 in ?? ()
#10 0x0805cb97 in PyObject_Call ()
#11 0x080c7e04 in PyEval_EvalFrameEx ()
#12 0x080c96e5 in PyEval_EvalFrameEx ()
#13 0x080c96e5 in PyEval_EvalFrameEx ()
#14 0x080cb1f7 in PyEval_EvalCodeEx ()
#15 0x0811372e in ?? ()
#16 0x0805cb97 in PyObject_Call ()
#17 0x08062bfb in ?? ()
#18 0x0805cb97 in PyObject_Call ()
#19 0x080c2e9c in PyEval_CallObjectWithKeywords ()
#20 0x080f59a8 in ?? ()
#21 0xb7fb04fb in start_thread () from /lib/tls/i686/cmov/libpthread.so.0
#22 0xb7f04e5e in clone () from /lib/tls/i686/cmov/libc.so.6

论坛徽章:
0
9 [报告]
发表于 2009-08-11 15:20 |只看该作者
"即使单核也不正常呀,10个线程如果能并发,那么花费的时间和单线程花费的时间应该是1样的!"
不是吧,单核cpu中,多线程也不是并发执行的,花费时间怎么是一样呢?

“libmemcache_pool中我创建10个pool,对应我创建10个线程来使用pool,这样应该是并发了吧”
libmemcache_pool是连接池,目的是为多线程提供安全、可重复使用的mc连接,和并发概念不一样吧!

根据我提供的代码做的修改运行出错:我不是很清楚那错误信息说什么,你修改的代码有没有用mc连接池?请先确认这个错误是不是其他地方引起的。另请google查下关于线程释放GIL的文章,了解这修改的作用。
我提供的这个代码,应该是正确的,是从twisted框架里面摘出来的,twisted中epoll,iocp扩展模块都是用了这释放GIL的代码。

论坛徽章:
0
10 [报告]
发表于 2009-08-11 21:12 |只看该作者

回复 #9 seewind 的帖子

"即使单核也不正常呀,10个线程如果能并发,那么花费的时间和单线程花费的时间应该是1样的!"
>>不是吧,单核cpu中,多线程也不是并发执行的,花费时间怎么是一样呢?
我换单核进行测试:
test()函数读写memcache 500000次
我fork两个进程并发读写,从开始到两个进程都结束总共花费时间10s,
我fork 10个进程并发读写,从开始到10个进程结束花费时间是12s

但如果我创建两个线程读写,程序总共花费时间是20s,创建10线程读写,花费时间是109s



coredump找到原因了,已经解决了,
加上get和set前释放GIL,100线程并发下,性能提高了40%,但远远达不到我的要求

看来python这个线程机制还是不适用我的情况,看来需要换方案了。

[ 本帖最后由 watercloud 于 2009-8-11 21:19 编辑 ]
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP