免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
1234下一页
最近访问板块 发新帖
查看: 27649 | 回复: 33
打印 上一主题 下一主题

[C++] ASIO协程彻底转变你的思维 [复制链接]

论坛徽章:
3
2015年辞旧岁徽章
日期:2015-03-03 16:54:152015年迎新春徽章
日期:2015-03-04 09:58:11数据库技术版块每日发帖之星
日期:2015-08-30 06:20:00
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2013-04-13 19:57 |只看该作者 |倒序浏览
本帖最后由 蔡万钊 于 2013-04-16 18:05 编辑

avbot 发布了许久了, 最近突然有个用户跑来说,希望能增加个调用 “外部脚本” 的功能,方便扩展。

我一向对设计一个 plugin 机制极力的避免,不喜欢动态载入的模块扩展程序本身的功能。何况 avbot 是 c++开发的,调用脚本并不是容易的事情。(好吧,真实的原因是我被 mingw (VC 不支持 utf8源码,我已经抛弃了) 折腾怕了,不想再搞个 python 。windows实在是恐怖的平台,写点程序麻烦的要死,编译麻烦的要死。可是 avbot 又必须跨平台,结果是我一天写好的东西要在 windows (虚拟机) 里折腾好几天,累死人 )

于是我决定提供一个  JSON 接口,内置一个简单的 HTTP Server, 用脚本(python应该 HTTP JSON 模块有的是,对吧)连接到 avbot ,然后 avbot 将发生的每条消息以 json 的形式返回给 外部脚本。

另外,默认使用 HTTP 的connection: keep-alive 模式,所以保持一个长连接即可。

那么,avbot 需要支持不确定数目的消息接收方了。

对于链接到 avbot 的客户端而言, avbot 并不保留之前的所有消息,而是从连接上的那一刻开始,后续的消息才能通知到。
一个很明显的思路就是,将链接上的客户端做成一个链表/列队, avbot 收到消息后,遍历这个列队执行消息发送。

这个思路很简单,可是如果要求 : 必须单线程异步呢?

avbot 是一个纯粹的单线程程序,绝对不允许多线程化。所有的逻辑必须使用异步处理。

那么,这个问题就复杂化了, “avbot 收到消息后,遍历这个列队执行消息发送” 这个做法,不可避免的带来了阻塞。好吧,异步遍历吧。

要是异步遍历还没遍历完,又来一个消息呢? 考虑这个问题,你会发疯的。因为异步,太多的细节需要考虑了。真的。

好吧,又有个好主意了,为每个客户端建立一个列队,每次遍历就是把要发送的消息挂入列队即可。这样也不需要异步遍历了,同步就可以。解决了异步遍历的时候又来一个消息导致的痛苦的调度。

然后细分,考虑每个客户端,就是等待 “发送列队” 不为空!等等,一直这么等待也不行,如果客户断开了链接呢? 所以要 “同时等待发送列队不为空&&客户正常在线,并且已经发送了 HTTP 请求头部”

好绕口,不过也只能如此了。

avbot 因为默认使用了 keep-alive , 所以发送是一个死循环,知道客户端主动断开链接或者网络发生错误。如果 客户端死了,那么,发送列队兴许会出现 爆队 的情况。所以要限制发送列队的大小。不是满了就不发送,而是满了后就把早的消息踢掉,也就是让 客户端发生“暂时性卡死”后,还能继续处理最后的几条信息。

诶,复杂的逻辑终于理清了,代码呢?!

啊累?

靠,这么复杂的 逻辑,得写一长段代码,调试几百年了吧?


错,我只花了几个小时,不到 100 行的代码就轻松实现了全部要求。


!!!!!!!!!!!!!!!!!!! WHAT !!!!!!!!!!!!!!!!!!!

这种功能不可能不用个千把行代码的吧?!

如果使用以前的老办法,确实如此。

可是,自从发现了 ASIO 后,我被 ASIO 爸爸发明的协程深深的震惊了!

利用 ASIO 爸爸提出的协程思想,我只用了不到 100行代码就全部完成了以上复杂的逻辑,而且,全部都是异步的哦~ 。


好,废话不多,先贴代码。然后解释。
  1. // avbot_rpc_server 由 acceptor_server 这个辅助类调用
  2. // 为其构造函数传入一个 m_socket, 是 shared_ptr 的.
  3. class avbot_rpc_server
  4. {
  5. public:
  6.         typedef boost::signals2::signal<
  7.                 void( std::string protocol, std::string room, std::string who,
  8.                                 std::string message, sender_flags )
  9.         > on_message_signal_type;
  10.         static on_message_signal_type on_message;

  11.         typedef boost::asio::ip::tcp Protocol;
  12.         typedef boost::asio::basic_stream_socket<Protocol> socket_type;
  13.         typedef void result_type;

  14.         avbot_rpc_server(boost::shared_ptr<socket_type> _socket)
  15.           : m_socket(_socket)
  16.           , m_request(new boost::asio::streambuf)
  17.           , m_responses(new boost::circular_buffer_space_optimized<boost::shared_ptr<boost::asio::streambuf> >(20) )
  18.         {
  19.                 m_socket->get_io_service().post(
  20.                         boost::asio::detail::bind_handler(*this, boost::coro::coroutine(), boost::system::error_code(), 0)
  21.                 );
  22.         }

  23.         // 数据操作跑这里,嘻嘻.
  24.         void operator()(boost::coro::coroutine coro, boost::system::error_code ec, std::size_t bytestransfered)
  25.         {
  26.                 boost::shared_ptr<boost::asio::streambuf>        sendbuf;

  27.                 if (ec){
  28.                         m_socket->close(ec);
  29.                         // 看来不是 HTTP 客户端,诶,滚蛋啊!
  30.                         // 沉默,直接关闭链接. 取消信号注册.
  31.                          if (m_connect && m_connect->connected())
  32.                                 m_connect->disconnect();
  33.                         return;
  34.                 }

  35.                 CORO_REENTER(&coro)
  36.                 {
  37.                 do{
  38.                         // 发起 HTTP 处理操作.
  39.                         _yield boost::asio::async_read_until(*m_socket, *m_request, "\r\n\r\n", boost::bind(*this, coro, _1, _2));
  40.                         m_request->consume(bytestransfered);

  41.                         // 解析 HTTP

  42.                         // 等待消息.
  43.                         if (m_responses->empty())
  44.                         {
  45.                                 if (!m_connect){
  46.                                         // 将自己注册到 avbot 的 signal 去
  47.                                         // 等 有消息的时候,on_message 被调用,也就是下面的 operator() 被调用.
  48.                                         _yield m_connect = boost::make_shared<boost::signals2::connection>
  49.                                                 (on_message.connect(boost::bind(*this, coro, _1, _2, _3, _4, _5)));
  50.                                         // 就这么退出了,但是消息来的时候,om_message 被调用,然后下面的那个
  51.                                         // operator() 就被调用了,那个 operator() 接着就会重新回调本 operator()
  52.                                         // 结果就是随着 coroutine 的作用,代码进入这一行,然后退出  if 判定
  53.                                         // 然后进入发送过程.
  54.                                 }else{
  55.                                         // 如果已经注册,直接返回。时候如果消息来了,on_message 被调用,也就
  56.                                         // 是下面的 operator() 被调用. 结果就是随着 coroutine 的作用,代码
  57.                                         // 进入上面那行,然后退出  if 判定。然后进入发送过程.
  58.                                         return;
  59.                                 }
  60.                                 // signals2 回调的时候会进入到这一行.
  61.                         }
  62.                         // 进入发送过程
  63.                         sendbuf = m_responses->front();
  64.                         _yield boost::asio::async_write(*m_socket, *sendbuf, boost::bind(*this, coro, _1, _2) );
  65.                         m_responses->pop_front();
  66.                         // 写好了,重新开始我们的处理吧!
  67.                 }while(1);
  68.                 }
  69.         }

  70.         // signal 的回调到了这里, 这里我们要区分对方是不是用了 keep-alive 呢.
  71.         void operator()(boost::coro::coroutine coro, std::string protocol, std::string room, std::string who, std::string message, sender_flags)
  72.         {
  73.                 pt::ptree jsonmessage;
  74.                 boost::shared_ptr<boost::asio::streambuf> buf(new boost::asio::streambuf);
  75.                 std::ostream        stream(buf.get());
  76.                 std::stringstream        teststream;
  77.                 jsonmessage.put("protocol", protocol);
  78.                 jsonmessage.put("root", room);
  79.                 jsonmessage.put("who", who);
  80.                 jsonmessage.put("msg", message);

  81.                 js::write_json(teststream,  jsonmessage);

  82.                 // 直接写入 json 格式的消息吧!
  83.                 stream <<  "HTTP/1.1 200 OK\r\n" <<  "Content-type: application/json\r\n";
  84.                 stream <<  "connection: keep-alive\r\n" <<  "Content-length: ";
  85.                 stream << teststream.str().length() <<  "\r\n\r\n";

  86.                 js::write_json(stream, jsonmessage);

  87.                 // 检查 发送缓冲区.
  88.                 if (m_responses->empty()){
  89.                         // 打通仁督脉.
  90.                         m_socket->get_io_service().post(boost::asio::detail::bind_handler(*this, coro, boost::system::error_code(), 0));
  91.                 }
  92.                 // 写入 m_responses
  93.                 m_responses->push_back(buf);
  94.         }
  95. private:
  96.         boost::shared_ptr<socket_type> m_socket;
  97.         boost::shared_ptr<boost::signals2::connection> m_connect;

  98.         boost::shared_ptr<boost::asio::streambuf>        m_request;
  99.         boost::shared_ptr<boost::circular_buffer_space_optimized<boost::shared_ptr<boost::asio::streambuf> > >        m_responses;
  100. };
  101. }
复制代码
首先这个 avbot_rpc_server 由一个 acceptor_service 辅助类调用。 acceptor_service 是一个模板,大家可以去 acceptor_server.hpp 膜拜。

acceptor_service 以 Protocol 和一个 处理类 为模板。在 main.cpp里,我以 asio::ip::tcp 作为 Protocl 的参数 avbot_rpc_server为 ProtocolProcesser的参数 调用acceptor_service。acceptor_service 进入一个死循环(协程的)不停的 accept , 然后将 accept 到的 socket 交给 ProtocolProcesser,也就是 avbot_rpc_server 。

avbot_rpc_server 处理一下客户的请求头,然后把自己注册到 on_message 信号处理。

然后,然后就没然后了。

on_message 在 avbot 接收到消息的时候发出。结果就是 avbot_rpc_server 的 第二个 operator() 被调用。然后就继续发送了。
当然,并不是每一个 on_message 都会导致 avbot_rpc_server 的 第二个 operator() 被调用的,必须是列队为空的时候。不为空的时候就不需要调用。发送循环会继续循环的,避免竞争出现





论坛徽章:
4
水瓶座
日期:2013-09-06 12:27:30摩羯座
日期:2013-09-28 14:07:46处女座
日期:2013-10-24 14:25:01酉鸡
日期:2014-04-07 11:54:15
2 [报告]
发表于 2013-04-13 20:27 |只看该作者
协程个JB, busy loop去检查而已,状态机服务必备。

论坛徽章:
3
2015年辞旧岁徽章
日期:2015-03-03 16:54:152015年迎新春徽章
日期:2015-03-04 09:58:11数据库技术版块每日发帖之星
日期:2015-08-30 06:20:00
3 [报告]
发表于 2013-04-13 20:35 |只看该作者
回复 2# linux_c_py_php


    你哪个眼睛看到是轮询的?别看到 do {} while(1); 就觉得是轮询。

论坛徽章:
0
4 [报告]
发表于 2013-04-13 20:46 |只看该作者
c++协程果然不错, 赞一个.

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
5 [报告]
发表于 2013-04-15 11:52 |只看该作者
蔡万钊 发表于 2013-04-13 19:57
avbot 是一个纯粹的单线程程序,绝对不允许多线程化。所有的逻辑必须使用异步处理。

...

别的没意见。你如何发挥多核性能?多进程的?

论坛徽章:
1
技术图书徽章
日期:2013-09-10 08:57:55
6 [报告]
发表于 2013-04-15 17:44 |只看该作者
协程可以避免上下文切换, 是个好东西.

不过LZ的程序作为教程的话有点复杂, 我没有看懂.  协程相关的语句解释少了一点.

论坛徽章:
9
CU大牛徽章
日期:2013-04-17 11:06:23CU大牛徽章
日期:2013-04-17 11:08:52CU大牛徽章
日期:2013-04-17 11:09:10CU大牛徽章
日期:2013-04-17 11:09:40CU大牛徽章
日期:2013-04-17 11:09:57CU大牛徽章
日期:2013-04-17 11:10:17CU大牛徽章
日期:2013-05-20 10:43:41CU大牛徽章
日期:2013-05-20 10:44:06CU大牛徽章
日期:2013-05-20 10:44:16
7 [报告]
发表于 2013-04-16 09:27 |只看该作者
看看

论坛徽章:
44
15-16赛季CBA联赛之浙江
日期:2021-10-11 02:03:59程序设计版块每日发帖之星
日期:2016-07-02 06:20:0015-16赛季CBA联赛之新疆
日期:2016-04-25 10:55:452016科比退役纪念章
日期:2016-04-23 00:51:2315-16赛季CBA联赛之山东
日期:2016-04-17 12:00:2815-16赛季CBA联赛之福建
日期:2016-04-12 15:21:2915-16赛季CBA联赛之辽宁
日期:2016-03-24 21:38:2715-16赛季CBA联赛之福建
日期:2016-03-18 12:13:4015-16赛季CBA联赛之佛山
日期:2016-02-05 00:55:2015-16赛季CBA联赛之佛山
日期:2016-02-04 21:11:3615-16赛季CBA联赛之天津
日期:2016-11-02 00:33:1215-16赛季CBA联赛之浙江
日期:2017-01-13 01:31:49
8 [报告]
发表于 2013-04-16 09:56 |只看该作者
蔡万钊 发表于 2013-04-13 19:57
avbot 是一个纯粹的单线程程序,绝对不允许多线程化。


为什么呢?

论坛徽章:
0
9 [报告]
发表于 2013-04-16 11:17 |只看该作者
协程,coroutine。
我google一下,感觉coro应该是一个带状态的functor,每次调用同一个coro时根据上次的状态从不同的地方开始执行。
因为这种机制,coro就有了“暂停”和“恢复”的机制。
而在异步编程中,常见的编程场景就是在异步操作开始前显式的保存异步回调所需要的状态,把下一步需要执行的操作包装成一个函数或者类似的东西作为异步回调。
在协程中,因为有了这种暂停和恢复机制,可以直接把暂停的协程作为异步回调。
异步操作结束,回调这个暂停的协程的时候,自然就恢复了协程的执行。
由于这种机制,一个协程可以包含异步操作开始前和结束后的所有操作,因此可以把异步操作写得与同步的阻塞操作几乎一模一样,从而极大地简化异步编程所需的代码。

是这么回事么?

论坛徽章:
15
射手座
日期:2014-11-29 19:22:4915-16赛季CBA联赛之青岛
日期:2017-11-17 13:20:09黑曼巴
日期:2017-07-13 19:13:4715-16赛季CBA联赛之四川
日期:2017-02-07 21:08:572015年亚冠纪念徽章
日期:2015-11-06 12:31:58每日论坛发贴之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-08-04 06:20:00程序设计版块每日发帖之星
日期:2015-07-12 22:20:002015亚冠之浦和红钻
日期:2015-07-08 10:10:132015亚冠之大阪钢巴
日期:2015-06-29 11:21:122015亚冠之广州恒大
日期:2015-05-22 21:55:412015年亚洲杯之伊朗
日期:2015-04-10 16:28:25
10 [报告]
发表于 2013-04-16 12:39 |只看该作者
本帖最后由 yulihua49 于 2013-04-16 12:41 编辑
qinggeng 发表于 2013-04-16 11:17
协程,coroutine。
我google一下,感觉coro应该是一个带状态的functor,每次调用同一个coro时根据上次的状 ...

这样有什么好处呢?我用多线程+context表示状态,每次线程调用这个context时从相应的状态点运行,也可以进行高吞吐量的asio。
程序也到不了100行,还好懂。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP