windoze
发表于 2014-12-05 17:22
回复 29# yulihua49
cmake /path/to/fibio
make
make test
wlmqgzm
发表于 2015-04-29 22:26
开源项目Fiberized.IO确实是个好东东啊, 感谢楼主的开发,正在看代码,学习楼主的开发思路。
分布式的神器之一,可以大大加快分布式的开发。
wlmqgzm
发表于 2015-04-29 22:37
刚开始看代码,concurrent_queue.hpp, 使用了比较多的锁,是否可以考虑使用无锁的实现方式?Boost.Lockfree方式好像性能更好一些。
boost的例子:Examples
Queue
The boost::lockfree::queue class implements a multi-writer/multi-reader queue. The following example shows how integer values are produced and consumed by 4 threads each:
#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>
#include <iostream>
#include <boost/atomic.hpp>
boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);
boost::lockfree::queue<int> queue(128);
const int iterations = 10000000;
const int producer_thread_count = 4;
const int consumer_thread_count = 4;
void producer(void)
{
for (int i = 0; i != iterations; ++i) {
int value = ++producer_count;
while (!queue.push(value))
;
}
}
boost::atomic<bool> done (false);
void consumer(void)
{
int value;
while (!done) {
while (queue.pop(value))
++consumer_count;
}
while (queue.pop(value))
++consumer_count;
}
int main(int argc, char* argv[])
{
using namespace std;
cout << "boost::lockfree::queue is ";
if (!queue.is_lock_free())
cout << "not ";
cout << "lockfree" << endl;
boost::thread_group producer_threads, consumer_threads;
for (int i = 0; i != producer_thread_count; ++i)
producer_threads.create_thread(producer);
for (int i = 0; i != consumer_thread_count; ++i)
consumer_threads.create_thread(consumer);
producer_threads.join_all();
done = true;
consumer_threads.join_all();
cout << "produced " << producer_count << " objects." << endl;
cout << "consumed " << consumer_count << " objects." << endl;
}
The program output is:
produced 40000000 objects.
consumed 40000000 objects.
Stack
The boost::lockfree::stack class implements a multi-writer/multi-reader stack. The following example shows how integer values are produced and consumed by 4 threads each:
#include <boost/thread/thread.hpp>
#include <boost/lockfree/stack.hpp>
#include <iostream>
#include <boost/atomic.hpp>
boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);
boost::lockfree::stack<int> stack(128);
const int iterations = 1000000;
const int producer_thread_count = 4;
const int consumer_thread_count = 4;
void producer(void)
{
for (int i = 0; i != iterations; ++i) {
int value = ++producer_count;
while (!stack.push(value))
;
}
}
boost::atomic<bool> done (false);
void consumer(void)
{
int value;
while (!done) {
while (stack.pop(value))
++consumer_count;
}
while (stack.pop(value))
++consumer_count;
}
int main(int argc, char* argv[])
{
using namespace std;
cout << "boost::lockfree::stack is ";
if (!stack.is_lock_free())
cout << "not ";
cout << "lockfree" << endl;
boost::thread_group producer_threads, consumer_threads;
for (int i = 0; i != producer_thread_count; ++i)
producer_threads.create_thread(producer);
for (int i = 0; i != consumer_thread_count; ++i)
consumer_threads.create_thread(consumer);
producer_threads.join_all();
done = true;
consumer_threads.join_all();
cout << "produced " << producer_count << " objects." << endl;
cout << "consumed " << consumer_count << " objects." << endl;
}
The program output is:
produced 4000000 objects.
consumed 4000000 objects.
Waitfree Single-Producer/Single-Consumer Queue
The boost::lockfree::spsc_queue class implements a wait-free single-producer/single-consumer queue. The following example shows how integer values are produced and consumed by 2 separate threads:
#include <boost/thread/thread.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <iostream>
#include <boost/atomic.hpp>
int producer_count = 0;
boost::atomic_int consumer_count (0);
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024> > spsc_queue;
const int iterations = 10000000;
void producer(void)
{
for (int i = 0; i != iterations; ++i) {
int value = ++producer_count;
while (!spsc_queue.push(value))
;
}
}
boost::atomic<bool> done (false);
void consumer(void)
{
int value;
while (!done) {
while (spsc_queue.pop(value))
++consumer_count;
}
while (spsc_queue.pop(value))
++consumer_count;
}
int main(int argc, char* argv[])
{
using namespace std;
cout << "boost::lockfree::queue is ";
if (!spsc_queue.is_lock_free())
cout << "not ";
cout << "lockfree" << endl;
boost::thread producer_thread(producer);
boost::thread consumer_thread(consumer);
producer_thread.join();
done = true;
consumer_thread.join();
cout << "produced " << producer_count << " objects." << endl;
cout << "consumed " << consumer_count << " objects." << endl;
}
The program output is:
produced 10000000 objects.
consumed 10000000 objects.
wlmqgzm
发表于 2015-04-29 23:42
作者:徐辰, 支持, 好样的
lxyscls
发表于 2015-04-30 09:59
什么都会的大神{:yct40:}
windoze
发表于 2015-04-30 10:13
本帖最后由 windoze 于 2015-04-30 10:13 编辑
回复 32# wlmqgzm
其实你说的这个我想过,后来没用lockfree的一个主要原因是fiber不能独占thread,必需和其它fiber协作,lockfree数据结构最大的问题就是如果想要block就只能spin,空耗CPU,而fiber需要的是切换到其它fiber,条件满足之前不要唤/醒。
我在fibio里实现的是一个最基本的不动脑子的queue,而这些lockfree数据结构只用到了atomic,所以你可以直接混用,不会打架,只要你确保它们真的是lockfree就行,否则可能会block一个worker thread,降低并发性。
PS. 又忘了“唤/醒”是敏感词……
wlmqgzm
发表于 2015-04-30 15:54
还是觉得 "无锁队列+队列无数据可读写时yield()",性能更好,更适合fibers,并发队列是fibers协同工作很重要的基础构件,希望能修改,谢谢。
希望尽快提供MYSQL client 的封装,现在的client实在弄不成,太耗CPU,
实现核心的关键,请参考
Async MySQL Queries with C-API
http jan.kneschke.de/2008/9/9/async-mysql-queries-with-c-api
windoze
发表于 2015-04-30 15:58
本帖最后由 windoze 于 2015-04-30 15:59 编辑
回复 36# wlmqgzm
如果你看过mutex和cv的实现,应该不会问这个问题了 {:qq23:}
fibio里的mutex实现就是spinlock+yield,没有用OS提供的锁,concurrent_queue使用fibio的mutex和cv,在OS层面就是一个无锁队列。
MySQL的封装一直都想搞来着,可是最近实在没时间……
windoze
发表于 2015-05-27 22:55
5月27日更新,终于支持Windows+VC2015……
cokeboL
发表于 2015-05-28 08:55
楼主偶像棒棒哒