免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 5784 | 回复: 4
打印 上一主题 下一主题

boost asio库单线程异步发送乱序 贴代码..... [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2010-09-01 15:05 |只看该作者 |倒序浏览
  1. #include <iostream>
  2. #include <istream>
  3. #include <ostream>
  4. #include <fstream>
  5. #include <string>
  6. #include <boost/asio.hpp>
  7. #include <boost/bind.hpp>
  8. #include <deque>
  9. #include <queue>
  10. #include <boost/asio.hpp>
  11. #include <boost/bind.hpp>
  12. #include <boost/noncopyable.hpp>
  13. #include <boost/thread.hpp>
  14. #include <boost/lexical_cast.hpp>
  15. #include <boost/shared_ptr.hpp>

  16. using boost::asio::ip::tcp;
  17. /**
  18. * * @brief 发送数据解构体
  19. * */
  20. struct SSendDataBuff
  21. {
  22.     SSendDataBuff(const char *s, size_t n) : pData(NULL), nSize(0)
  23.     {
  24.         pData = new char[n];
  25.         memset(pData, 0, n);
  26.         memcpy(pData, s, n);
  27.         nSize = n;
  28.     }

  29.     ~SSendDataBuff()
  30.     {
  31.         if(pData)
  32.         {
  33.             delete [] pData;
  34.             pData = NULL;
  35.         }
  36.         nSize = 0;
  37.     }

  38.     char *pData;      //数据
  39.     size_t nSize;     //数据长度
  40. };
  41. class client
  42. {
  43.     public:
  44.         client(boost::asio::io_service& io_service)
  45.             :socket_(io_service), resolver(io_service)//, m_io_work(io_service)
  46.     {
  47.         m_bHasSend = true;
  48.         tcp::resolver::query query("192.168.0.226", "9988");
  49.         tcp::resolver::iterator iterator = resolver.resolve(query);

  50.         tcp::endpoint endpoint = *iterator;
  51.         socket_.async_connect(endpoint,
  52.                 boost::bind(&client::__handle_connect, this,
  53.                     boost::asio::placeholders::error));
  54.     }
  55. void __handle_connect(const boost::system::error_code& err)
  56.         {
  57.             if(err)         //连接失败
  58.             {
  59.                 printf("Connect is faild ....\n");
  60.             }
  61.             {
  62.                 printf("Connect is Successful ....\n");
  63.                 FILE *pt = fopen("./libs", "rb");
  64.                 if(NULL == pt)
  65.                 {
  66.                     assert(0);
  67.                 }

  68.                 char buff[1024];
  69.                 int nLen = 0;
  70.                 memset(buff, 0, sizeof(buff));
  71.                 while((nLen = fread(buff, sizeof(char), sizeof(buff), pt)) != 0)
  72.                 {
  73.                     sends(buff, nLen);
  74.                     memset(buff, 0, sizeof(buff));
  75.                 }
  76.                 printf("===============\n");
  77.                 fclose(pt);
  78.             }
  79.         }
  80. void sends(const char *pData, size_t nSize)
  81.         {
  82.             boost::mutex::scoped_lock AsynSendlock(m_SendQueMutex);
  83.             if(m_bHasSend)      //可以直接发送不需要放到缓冲区
  84.             {
  85.                 __asynSendData(pData, nSize);
  86.                 m_bHasSend = false;
  87.             }
  88.             else
  89.             {
  90.                 SSendDataBuff *pSendDataBuffer = new SSendDataBuff(pData, nSize);
  91.                 m_AsynSendQue.push(pSendDataBuffer);
  92.             }
  93.         }
  94. void __asynSendData(const char *pData, size_t nSize)
  95.         {
  96.         #if 0
  97.             static FILE *pt = fopen("./TEST_WRITE", "wb");
  98.             if(NULL == pt)
  99.             {
  100.                 assert(0);
  101.             }

  102.             size_t nLen = fwrite(pData, sizeof(char), nSize, pt);
  103.             if(nLen != nSize)
  104.             {
  105.                 assert(0);
  106.             }
  107.             fflush(pt);
  108.             #endif

  109.             async_write(socket_, boost::asio::buffer(pData, nSize),
  110.                     boost::bind(&client::__handle_tcp_write, this,
  111.                         boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
  112.         }
  113. void __handle_tcp_write(const boost::system::error_code& e, size_t nSize)
  114.         {
  115.             printf("__handle_tcp_write ... nSize = %d\n", nSize);
  116.             if(e)         //写错误
  117.             {
  118.                 assert(0);
  119.                 return;
  120.             }

  121.             boost::mutex::scoped_lock AsynSendlock(m_SendQueMutex);
  122.             if(!m_AsynSendQue.empty())     //如果发送队列不为空发送数据
  123.             {
  124.                 SSendDataBuff *p = m_AsynSendQue.front();
  125.                 __asynSendData(p->pData, p->nSize);
  126.                 delete p;
  127.                 p = NULL;

  128.                 //清除
  129.                 m_AsynSendQue.pop();
  130.             }
  131.             else
  132.             {
  133.                 m_bHasSend = true;
  134.             }
  135.         }
  136. private:
  137.         tcp::socket socket_;
  138.         tcp::resolver resolver;
  139.         //boost::asio::io_service::work m_io_work;

  140.         volatile bool m_bHasSend;                   //标识数据是否直接发送还是放到缓冲区
  141.         std::queue<SSendDataBuff *> m_AsynSendQue;       //保存异步发送数据队列
  142.         boost::mutex m_SendQueMutex;                //异步发送数据队列锁
  143. };

  144. int main(int argc, char* argv[])
  145. {
  146.     system("rm -rf TEST_WRITE");
  147.     try
  148.     {
  149.         boost::asio::io_service io_service;
  150.         client c(io_service);
  151.         io_service.run();
  152.         printf("HELLO ........\n");
  153.     }
  154.     catch (std::exception& e)
  155.     {
  156.         std::cout << "Exception: " << e.what() << "\n";
  157.     }

  158.     return 0;
  159. }
复制代码
传一个文件libs大概50多M , 传完后比较了下md5值, 有时候一样有时候不一样, 但大小都一样.....

服务器代码如下, 非常简单的一个原始套接口
  1. #include <assert.h>
  2. #include <stdio.h>
  3. #include <fcntl.h>
  4. #include <string.h>
  5. #include <stdlib.h>
  6. #include <pthread.h>
  7. #include <iostream>
  8. #include <sstream>
  9. #include <sys/types.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. #include <sys/types.h>
  13. #include <sys/socket.h>
  14. #include <arpa/inet.h>

  15. int Bind(const char *ip, unsigned int nPort)
  16. {
  17.     int nSockfd = ::socket(PF_INET, SOCK_STREAM, 0);
  18.     if(-1 == nSockfd)
  19.     {
  20.         assert(0);
  21.     }
  22. //解构体初始化
  23.     struct sockaddr_in servaddr;
  24.     bzero(&servaddr, sizeof(servaddr));
  25.     servaddr.sin_family = PF_INET;
  26.     if(NULL == ip)
  27.     {
  28.         servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  29.     }
  30.     else
  31.     {
  32.         int nRet = inet_pton(PF_INET, ip, &servaddr.sin_addr);
  33.         if(-1 == nRet)
  34.         {
  35.             assert(0);
  36.         }
  37.     }
  38.     servaddr.sin_port = htons(nPort);

  39.     int nRet = ::bind(nSockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
  40.     if(-1 == nRet)
  41.     {
  42.         assert(0);
  43.     }

  44.     nRet = ::listen(nSockfd, 256);
  45.     if(-1 == nRet)
  46.     {
  47.         assert(0);
  48.     }
  49. return nSockfd;
  50. }

  51. int main()
  52. {
  53.     system("rm -rf TEST_WRITE");
  54.     int nSockfd = Bind("192.168.0.226", 9988);
  55.     int nClientSockfd = 0;
  56.     while(1)
  57.     {
  58.         nClientSockfd = ::accept(nSockfd, NULL, NULL);
  59.         if(-1 == nClientSockfd)
  60.         {
  61.             if(errno == EINTR)
  62.             {
  63.                 continue;
  64.             }
  65.             return -1;
  66.         }
  67.         else
  68.         {
  69.             printf("new Client .........\n");
  70.             break;
  71.         }
  72.     }
  73. while(1)
  74.     {
  75.         char buff[65535];
  76.         int nLen = recv(nClientSockfd, buff, sizeof(buff), 0);
  77.         if(-1 == nLen)
  78.         {
  79.             assert(0);
  80.         }

  81.         static FILE *pt = fopen("./TEST_WRITE", "wb");
  82.         if(NULL == pt)
  83.         {
  84.             assert(0);
  85.         }

  86.         size_t n = fwrite(buff, sizeof(char), nLen, pt);
  87.         if(n != nLen)
  88.         {
  89.             assert(0);
  90.         }
  91.         fflush(pt);
  92.     }

  93.     return 0;
  94. }
复制代码
请各位朋友帮个忙, 多谢了.......

论坛徽章:
0
2 [报告]
发表于 2010-09-02 17:24 |只看该作者
难道大家都没有用boost吗....大家帮帮忙 ...
多谢....

论坛徽章:
0
3 [报告]
发表于 2010-09-02 17:29 |只看该作者
本帖最后由 ideawu 于 2010-09-02 17:38 编辑

不要在sends里把消息加入队列, 直接调用异步发送就行了。

论坛徽章:
0
4 [报告]
发表于 2010-09-02 22:04 |只看该作者
楼上的兄弟, 直接异步发送乱序的可能性更大.....

难道大家都不用boost的吗....
非常着急多谢大家....

论坛徽章:
0
5 [报告]
发表于 2011-05-12 22:04 |只看该作者
SSendDataBuff *p = m_AsynSendQue.front();

                __asynSendData(p->pData, p->nSize);

                delete p;  //发送是异步的,这里不能直接删除对象,造成发送的数据出错
                p = NULL;
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP