免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
123下一页
最近访问板块 发新帖
查看: 6462 | 回复: 26
打印 上一主题 下一主题

一个正在开发中的异步网络库 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2009-02-10 23:07 |只看该作者 |倒序浏览
接触Linux有一年了吧,总觉得的Posix线程,锁,条件变量,不方便,
创建线程的参数是不是应该多变一些呢?为啥不让我用类的成员函数?
(我水平一般,或者说还很菜,要不然怎么会没工作?)
还没有线程池,还有还有,反正网络编程很麻烦的。  于是,
我简单的封装一下,做一个函数库吧,记得设计模式说,起个
好名字很重要,如果名字起得不好,就会误导别人。我为了不
误导大家,叫一个没有任何意义的名字,Traxex。

这里面都实现了如下:
Functor:
把函数封装成类的东西。
比如,some_obj->do_something( /*arg_type*/ arg);
这一句话是一个函数调用,就可以封装成一个Functor,
之后不会说了。。。

Thread:     
POSIX thread 至少我是用不好,非得是一个void* (*func)( void*),
就很不舒服。还有那个创建就执行也不太好,等一等让我初使化点别
的东西不行么?急什么。现在这个Thread类用起来很简单,参数随意。

类的构造函数就是    : 函数名+参数列表
或是                : 类名+成员函数名+参数列表

void add( int x, int y);
Thread add_s( &add, 3, 6);

class T{
public:
   void add( int x, int y,int z);
};
T t;
Thread add_ss( t, &T::add, 3, 6,8);

Thread里还有锁,自动锁(出作用域自动解锁那个东西),读写锁,条件变量,
还有一个使用上也很简单的线程池。 可以像那个Thread的参数那样,
像线程池里添加新任务。


恩,还有一个线程安全队列,多线程对队列操作时,如果队列为空,pop操作会
阻塞,一直等到队数里有东西。

IO&Socket
简单封装一下tcp server,tcp client,不谈了啊,很简单,再多说被笑话了。
一个异步IO服务器,可以在多线程情况下,把一个套接字,一个要等待的事件,
还有一个回调(不知用啥名了,好名都叫狗起了,什么欢欢,乐乐,花花的,唉。。
反正就是可以调用的东西就行,函数,成员函数,只要参数写全了,就行了)。
这时,可以起任意多个线程去调用服务器的run方法,当事件发生时,异步IO服务器
会自动分发任务,让某个线程去执行的。( 也就是这些调用服务器的run方法的线程,
是像一个线程池一样工作的)。

函数库还在开发测试中,随时会更新,而且我也不能有点改动就上论坛来更新,
所以暂时不发代码了,有人喜欢就给我发邮件吧。
欢迎各位指点帮忙。 或是提供一个
其它类库,或是其它语言的模式也好,
谢谢了。。。

下面是一些简单例子。

[ 本帖最后由 die 于 2009-2-10 23:10 编辑 ]

论坛徽章:
0
2 [报告]
发表于 2009-02-10 23:09 |只看该作者
//
// @author <[email]zhuxueling@pica.com[/email]>
// @data   2009-02-10





//________Test Class_____________
//这个是摆设
class Test{
public:
    void say( const char* sb){
        while(true){
            std::cout << sb << " is foolish.\n";
            sleep(1);
        }
    }

    void say( const char* sb, const char* words){
        while(true){
            std::cout << sb << " says: " << words << std::endl;
            sleep(1);
        }
    }
};


//_________Thread__________________
int main(){
    Test atest;
//创建一个线程变得简单了,类名,成员函数名,参数列表就可以了。
//当然,也可以直接用普通函数名,加参数列表。 而且支持重载的。。
    Thread s( atest, &Test::say,"traxex");
    s.start();
    s.join();
}

//_________Cond__________________
//两个人在不停的添加水果,一个人等有水果就吃。。。
std::queue<string> fruits;
Cond               cond;
Mutex              mutex;

void add( string name){
    while( true){
        {  
           ScopeLock lock( mutex);
           fruits.push( name);
        }
        std::cout << "add " << name << std::endl;
        cond.signal();
        Sleep( 200);
    }
}
void eat( ){
    while( true){
        cond.wait();
        ScopeLock lock( mutex);
        fruits.pop();
        std::cout << "eat "<< fruits.front() << std::endl;
    }
}
int main(){
    Thread add_apple ( &add, "apple ");
    Thread add_orange( &add, "orange");
    Thread eat_fruit ( &eat);

    add_apple.start(); add_orange.start(); eat_fruit.start();
    add_apple.join();  add_orange.join();  eat_fruit.join();
    return 0;
}
//__________Thread Pool___________
//启动一个有3个线程的线程池,来执行这几个函数。
int main(){
    ThreadPool tp( 3);
    Test atest;
    tp.push( atest, &Test::say, "traxex", "fuckyou");
    tp.push( atest, &Test::say, "traxex", "Oh, Sorry");
    tp.push( atest, &Test::say, "zhuzhu", "sounds good");
    tp.push( atest, &Test::say, "gtkmm" , "HELP!");
    tp.push( atest, &Test::say, "gtkmm" , "Ahh!!");
    return 0;
}

//________TCP Server___________
//简单的服务器
int main(){
    TcpServer server("127.0.0.1", 8888);
    server.init();
    TcpWorker worker = server.accept();
    while( worker.closed()){
        const char* req = worker.recv();
        worker.send( "haha, I'm the joker.");
    }
}

//________TCP Client___________
//简单的客户端
int main(){
    TcpClient client("127.0.0.1", 8888);
    client.connect();

    std::string msg;
    while( cin>>msg && !client.closed() ){
        client.send( msg);
        std::cout << client.recv();
    }
    return 0;
}

//________IO Service_________
//异步IO服务器,动态注册事件,以回调函数的方式得到异步通知。
class MyServer{
public:
    MyServer( IOService &ioservice, short port)
        :server( port,true),
         ios( ioservice){
        if( server.init()){
            server.async_wait( ios, EV_ACCEPT_LOOP, &MyServer::fun_accept, this);
        }
    }
public:
    static void fun_accept( int fd, uint events, void* self);
    static void fun_read  ( int fd, uint events, void* self);

    TcpServer                        server;
    IOService                        &ios;
    Mutex                        mutex;
    std::map<int,TcpWorker>        clients;
};

void MyServer::fun_accept( int fd, uint events, void* ptr){

    MyServer *self = (MyServer*)ptr;
    ScopeLock lock( self->mutex);
    TcpWorker new_worker =self->server.accept();
    self->clients[new_worker.fd] = new_worker;
    std::cout << "accept " << new_worker.fd << "\n";
    if( new_worker.fd == -1){
        std::cout << strerror( errno) << std::endl;
    }
    new_worker.async_wait( self->ios, TcpWorker::EV_READ, &MyServer::fun_read, self);
}

void MyServer::fun_read  ( int fd, uint events, void* ptr){

    MyServer *self = (MyServer*)ptr;
    if( events & EPOLLIN){
        std::cout << "\Recv: " << self->clients[fd].recv();
        self->clients[fd].async_wait( self->ios, TcpWorker::EV_READ, &MyServer::fun_read, self);

    }else{
        clients[fd].close();
        ScopeLock lock( self->mutex);
        self->clients.erase(fd);
    }
}
int main(){
    IOService service;
    MyServer  server( service, 8888);
    service.run();
}

[[i] 本帖最后由 die 于 2009-2-11 00:49 编辑 [/i]]

论坛徽章:
0
3 [报告]
发表于 2009-02-10 23:18 |只看该作者
工作量不小啊

论坛徽章:
0
4 [报告]
发表于 2009-02-10 23:27 |只看该作者
能够把实例方法作为线程的启动函数,这个功能比较实用,而且接口很简洁,good work!:wink:

论坛徽章:
0
5 [报告]
发表于 2009-02-10 23:34 |只看该作者
TO die:

橘子苹果的解法好像有个小问题
void add( string name){
    while( true){
        ScopeLock lock( mutex);
        fruits.push( name);
        std::cout << "add " << name << std::endl;
        cond.signal();
        Sleep( 200);
    }
}

void eat( ){
    while( true){
        cond.wait();
        ScopeLock lock( mutex);
        fruits.pop();
        std::cout << "eat "<< fruits.front() << std::endl;
    }
}


eat线程进入临界区吃苹果的条件是“fruits不为空”
void eat( ){
    while( true){
        ScopeLock lock( mutex);
        while (fruits.size() != 0)
           cond.wait(&mutex);
        fruits.pop();
        std::cout << "eat "<< fruits.front() << std::endl;
    }
}

[ 本帖最后由 CRLF 于 2009-2-10 23:41 编辑 ]

论坛徽章:
0
6 [报告]
发表于 2009-02-10 23:40 |只看该作者
condition variable总是和mutex一一对应的,通常的使用模式:

  1. mutex.lock();
  2. while (期待的条件 == FALSE)
  3.       condition.wait(&mutex);

  4. assert(期待的条件 == 真);
  5. do somthing;

  6. mutex.unlock();
复制代码

你的Condition类的wait方法缺少了mutex这个参数。下面是posix和boost中wait condition函数的原型,参考下:

  1. void pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

  2. class condition
  3. {
  4.       void wait(mutex *lock);
  5. }
复制代码

论坛徽章:
0
7 [报告]
发表于 2009-02-10 23:52 |只看该作者
1. 得到了的通知,队列必然不为空。
2. Cond里面有一个Lock,wait时会自动加锁。

ps: 我没有觉得创建Cond,要一个外面分配的锁有什么好处。
   所以我的函数库里,Cond自己创建锁。

ps: 那几个例子都是我测试过的一部分了,没有问题的。
ps: boost用了大量的模板,写一个简单的程序,编译速度也很差。

[ 本帖最后由 die 于 2009-2-10 23:56 编辑 ]

论坛徽章:
5
2015年辞旧岁徽章
日期:2015-03-03 16:54:152015年迎新春徽章
日期:2015-03-04 09:53:172015亚冠之水原三星
日期:2015-06-02 16:34:202015年亚冠纪念徽章
日期:2015-10-19 18:13:37程序设计版块每日发帖之星
日期:2015-11-08 06:20:00
8 [报告]
发表于 2009-02-11 00:01 |只看该作者

论坛徽章:
0
9 [报告]
发表于 2009-02-11 00:14 |只看该作者
原帖由 die 于 2009-2-10 23:52 发表
1. 得到了的通知,队列必然不为空。
2. Cond里面有一个Lock,wait时会自动加锁。

ps: 我没有觉得创建Cond,要一个外面分配的锁有什么好处。
   所以我的函数库里,Cond自己创建锁。

ps: 那几个例子都是我测试过的一部分了,没有问题的。
ps: boost用了大量的模板,写一个简单的程序,编译速度也很差。

>> ps: 那几个例子都是我测试过的一部分了,没有问题的。

下面代码存在的问题是:cond没有被mutex保护,add线程与eat线程有可能同时访问cond的数据结构。因为概率非常小,所以测试中不会暴露出来。cond和fruits都是属于共享资源,必须由mutex保护。
void add(string name){
    while( true){
        ScopeLock lock( mutex);
        fruits.push(name);
        std::cout << "add " << name << std::endl;
        cond.signal();
        Sleep( 200);
    }
}

void eat( ){
    while( true){
        cond.wait();
        ScopeLock lock( mutex);
        fruits.pop();
        std::cout << "eat "<< fruits.front() << std::endl;
    }
}


>>ps: 我没有觉得创建Cond,要一个外面分配的锁有什么好处。
>>所以我的函数库里,Cond自己创建锁。
cond一定是和某个mutex相关联的,所以posix和boost::condition的wait cond操作中会有一个参数是mutex。


  1. void pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
  2. class condition
  3. {
  4.       void wait(mutex *lock);
  5. }
复制代码

论坛徽章:
0
10 [报告]
发表于 2009-02-11 00:23 |只看该作者
ScopeLock lock( mutex);
这个不是一个局部锁么。。。
出作用域自动解锁的。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP