免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
楼主: hy036630
打印 上一主题 下一主题

多线程OR多进程-访存密集型应用 [复制链接]

论坛徽章:
0
11 [报告]
发表于 2011-12-21 15:09 |只看该作者
hy036630 发表于 2011-12-21 14:58
不好意思 可能我上面没有说清楚
我用的是下面这个环形队列  无锁的实现一个进程读,一个进程写,没有牵涉 ...

不妨设置一个计数器,在“push时满”和"pop时空"的时候++下,统计下一分钟发生多少次,再决定下一步怎办

论坛徽章:
0
12 [报告]
发表于 2011-12-21 15:16 |只看该作者
这倒是个好主意
我试试 谢谢了{:3_188:}

论坛徽章:
11
未羊
日期:2013-12-16 12:45:4615-16赛季CBA联赛之青岛
日期:2016-04-11 19:17:4715-16赛季CBA联赛之广夏
日期:2016-04-06 16:34:012015亚冠之卡尔希纳萨夫
日期:2015-11-10 10:04:522015亚冠之大阪钢巴
日期:2015-07-30 18:29:402015亚冠之城南
日期:2015-06-15 17:56:392015亚冠之卡尔希纳萨夫
日期:2015-05-15 15:19:272015亚冠之山东鲁能
日期:2015-05-14 12:38:13金牛座
日期:2014-12-04 15:34:06子鼠
日期:2014-10-16 13:40:4715-16赛季CBA联赛之八一
日期:2016-07-22 09:41:40
13 [报告]
发表于 2011-12-21 15:39 |只看该作者
本帖最后由 zylthinking 于 2011-12-21 15:42 编辑
hy036630 发表于 2011-12-21 14:58
不好意思 可能我上面没有说清楚
我用的是下面这个环形队列  无锁的实现一个进程读,一个进程写,没有牵涉 ...


看代码, 流水线中几个线程都会处理同一份数据?
是不是这样
  1. thread1(char* p){
  2.     static int n = 0;
  3.    n += p[1];
  4. }

  5. thread2(char* p){
  6.     static int n = 0;
  7.    n += p[2];
  8. }

  9. 。。。。。

  10. thread_i(char* p){
  11.     static int n = 0;
  12.    n += p[i];
  13. }
复制代码
然而 p 是相同的?

论坛徽章:
0
14 [报告]
发表于 2011-12-21 17:12 |只看该作者
本帖最后由 sonicling 于 2011-12-21 17:13 编辑

用semaphore把队列指针包起来,一来可以保护指针,二来可以及时唤醒等待线程。如果忙等的话,生产者线程更新了队列,等到消费者线程那边发现的时候,黄花菜都凉了。

论坛徽章:
11
未羊
日期:2013-12-16 12:45:4615-16赛季CBA联赛之青岛
日期:2016-04-11 19:17:4715-16赛季CBA联赛之广夏
日期:2016-04-06 16:34:012015亚冠之卡尔希纳萨夫
日期:2015-11-10 10:04:522015亚冠之大阪钢巴
日期:2015-07-30 18:29:402015亚冠之城南
日期:2015-06-15 17:56:392015亚冠之卡尔希纳萨夫
日期:2015-05-15 15:19:272015亚冠之山东鲁能
日期:2015-05-14 12:38:13金牛座
日期:2014-12-04 15:34:06子鼠
日期:2014-10-16 13:40:4715-16赛季CBA联赛之八一
日期:2016-07-22 09:41:40
15 [报告]
发表于 2011-12-21 17:17 |只看该作者
sonicling 发表于 2011-12-21 17:12
用semaphore把队列指针包起来,一来可以保护指针,二来可以及时唤醒等待线程。如果忙等的话,生产者线程更新 ...

它很显然是多核, 倒不是这个原因, 很有可能是多处理器缓存同步

论坛徽章:
0
16 [报告]
发表于 2011-12-21 17:23 |只看该作者
也可以不用semaphore,但是要及时唤醒等待线程!否则等待线程会等到下一次自然调度才得到通知。

论坛徽章:
0
17 [报告]
发表于 2011-12-21 17:31 |只看该作者
CPU间缓存同步会影响效率,但是不至于数量级的差别。我以前也做过无锁循环队列,不过是内核级的,用atomic_t来表示位置,每次更新队列,强制唤醒消费线程,吞吐量上60w/s无压力。

论坛徽章:
0
18 [报告]
发表于 2011-12-22 08:08 |只看该作者
的确是这样的,我觉得也不应该是忙等的问题,应为机器是16线程的机器,按照道理至少有8个独立线程同时跑,
缓存我开始以为是这样的原因

不过设置了CPU亲和了以后也没有效果。

论坛徽章:
0
19 [报告]
发表于 2011-12-22 08:11 |只看该作者
本帖最后由 hy036630 于 2011-12-22 08:12 编辑

我所谓的忙等是这个意思
从环形队列取
while(NULL!=(event = queue.pop()));
doSomething();

插入到环形队列里
while(queue.push(event));
doSomething();


在多核的机器上面,应该不存在比较大的问题吧。

论坛徽章:
0
20 [报告]
发表于 2011-12-22 08:28 |只看该作者
sonicling 发表于 2011-12-21 17:31
CPU间缓存同步会影响效率,但是不至于数量级的差别。我以前也做过无锁循环队列,不过是内核级的,用atomic_ ...

其实我觉得不是队列慢了导致整体性能慢,
我觉得可能是增加流水线的长度会导致处理数据的速度变慢。
我给大家贴一段处理的代码吧。
  1. string GroupPE::getEventKey(eventPtr msg) {
  2.     vector<string> keys;
  3.     keys.push_back("UchAPN"); // apn
  4.     keys.push_back("Wlac"); // 位置区编号:lacid
  5.     keys.push_back("Cell"); // 小区信息:sac
  6.     keys.push_back("InterfaceType"); // 接口类型:interfacetype
  7.     keys.push_back("Protocoltype"); // 协议类型:protocolid
  8.     keys.push_back("Eventtype"); // 事件类型:eventtype
  9.     string key = Utils::encoding(*msg, keys.begin(), keys.end(), 2000);
  10.     return key;
  11. }

  12.    /*每个线程有一个统计map 用来存放统计数据 每次来一个event就根据getEventKey这个方法得到key,然后查看key是不是在统计map中,如果在统计map中,那么更新这个map的数据,如果没有,那么new一个event,设置一些值,然后放到统计map中*/
  13. void GroupPE::process(eventPtr msg) {//统计
  14.   
  15.     string key = this->getEventKey(msg);
  16.     map<string, eventPtr>::iterator iter4map = this->groupMap.find(key);


  17.     if (iter4map != groupMap.end()) {
  18.         int times = iter4map->second->getValueByName("times", 0);
  19.         ++times;
  20.         iter4map->second->setValueByName("times", times);
  21.         if (msg->getValueByName("cause", -1) == 255) { // response success
  22.             times = iter4map->second->getValueByName("succtimes", 0);
  23.             ++times;
  24.             iter4map->second->setValueByName("succtimes", times);
  25.         }
  26.         string str = msg->getValueByName("Btime", "1970-01-01 00:00:00.000");
  27.         long btime = Utils::millisecs(str.c_str());
  28.         str = msg->getValueByName("RspTime", "1970-01-01 00:00:00.000");
  29.         long rsptime = Utils::millisecs(str.c_str());
  30.         double delays = ((double) rsptime - (double) btime) / 1000;
  31.         double d = iter4map->second->getValueByName("delays", (double) 0);
  32.         iter4map->second->setValueByName("delays", delays + d);

  33.         d = iter4map->second->getValueByName("maxdelays", (double) 0);
  34.         if (d < delays)
  35.             iter4map->second->setValueByName("maxdelays", delays);
  36.         d = iter4map->second->getValueByName("mindelays", (double) 0);
  37.         if (d > delays)
  38.             iter4map->second->setValueByName("mindelays", delays);
  39.         if (btime != rsptime || msg->getValueByName("Tdrtype", (char) 0) != 1) {
  40.             times = iter4map->second->getValueByName("rsptimes", 0);
  41.             ++times;
  42.             iter4map->second->setValueByName("rsptimes", times);
  43.         } else {
  44.         }
  45.     } else { // 进行新统计结果记录
  46.         // 如果单个map数量太大超过阀值,需要进行清理
  47.         //begin use pointer add by lxy 7-19
  48. #ifndef NEW_ALLOC
  49.         CEventMessage *groupMapEvent = this->event_alloc.allocate(1);
  50.         groupMapEvent->metaData = NULL;
  51.         char * body = this->mt_allocator.allocate(150);
  52.         groupMapEvent->setBody(body);
  53.         EVENT_HEAD_MESSAGE * head = this->head_alloc.allocate(1); //new EVENT_HEAD_MESSAGE;
  54. #else
  55.         CEventMessage *groupMapEvent = new CEventMessage();
  56.         char * body = new char[150];
  57.         groupMapEvent->setBody(body);
  58.         EVENT_HEAD_MESSAGE * head = new EVENT_HEAD_MESSAGE(); //new EVENT_HEAD_MESSAGE;
  59. #endif
  60.         groupMapEvent->reset(head);
  61.         groupMapEvent->setHead(head);
  62.         groupMapEvent->getHead()->type = 2005; // 修改事件的类型为“统计事件对象”
  63.         groupMapEvent->getHead()->data_len = 150;
  64.         // 进行业务信息初始化
  65.         groupMapEvent->setValueByName("times", 1);
  66.         //  groupMapEvent->setValueByName("cause", msg->getValueByName("cause", -1));
  67.         if (msg->getValueByName("Cause", -1) == 255)
  68.             groupMapEvent->setValueByName("succtimes", 1);
  69.         else
  70.             groupMapEvent->setValueByName("succtimes", 0);
  71.         string Btime_str = msg->getValueByName("Btime", "1970-01-01 00:00:00.000");
  72.         long btime = Utils::millisecs(Btime_str.c_str());
  73.         groupMapEvent->setValueByName("sttime", msg->getValueByName("Btime", "1970-01-01 00:00:00.000"));
  74.         string RspTime_str = msg->getValueByName("RspTime", "1970-01-01 00:00:00.000");
  75.         long rsptime = Utils::millisecs(RspTime_str.c_str());
  76.         double delays = ((double) rsptime - (double) btime) / 1000;
  77.         groupMapEvent->setValueByName("delays", delays);
  78.         groupMapEvent->setValueByName("maxdelays", delays);
  79.         groupMapEvent->setValueByName("mindelays", delays);
  80.         //  groupMapEvent->setValueByName("tdrtype", msg->getValueByName("tdrtype", 0));
  81.         if (btime != rsptime || msg->getValueByName("Tdrtype", (char) 0) != 1) {
  82.             groupMapEvent->setValueByName("rsptimes", 1);
  83.         } else {
  84.             groupMapEvent->setValueByName("rsptimes", 0);
  85.         }
  86.         //统计维度的设置  add by lxy 7-21
  87.         groupMapEvent->setValueByName("apn", msg->getValueByName("UchAPN", "*"));
  88.         groupMapEvent->setValueByName("lacid", msg->getValueByName("Wlac", 0));
  89.         groupMapEvent->setValueByName("sac", msg->getValueByName("Cell", 0));
  90.         groupMapEvent->setValueByName("interfacetype", msg->getValueByName("InterfaceType", "*"));
  91.         groupMapEvent->setValueByName("protocolid", msg->getValueByName("Protocoltype", "*"));
  92.         groupMapEvent->setValueByName("eventtype", msg->getValueByName("Eventtype", 0));
  93.         this->groupMap.insert(map<string, eventPtr, std::less<string>, __gnu_cxx::__mt_alloc<string> >::value_type(key, groupMapEvent));
  94.     }
  95. }
复制代码
下面是Event的代码 不好意思 有点长 我截取一点给大家 CEventMessage.h
  1. class CEventMessage {
  2. public:

  3.     /*下面这些函数主要是根据类型找到配置文件然后在body的指定位置设置值*/
  4.     int getValueByName(const string &targetname, const int &defval) const; //
  5.     unsigned int getValueByName(const string &targetname, const unsigned int &defval) const;
  6.     long getValueByName(const string &targetname, const long &defval) const;
  7.     unsigned long getValueByName(const string &targetname, const unsigned long &defval) const;
  8.     char getValueByName(const string &targetname, const char &defval) const;
  9.     unsigned char getValueByName(const string &targetname, const unsigned char &defval) const;
  10.     short getValueByName(const string &targetname, const short &defval) const;
  11.     unsigned short getValueByName(const string &targetname, const unsigned short &defval) const;
  12.     string getValueByName(const string &targetname, const string &defval) const;
  13.     double getValueByName(const string &targetname, const double &defval) const;
  14.     float getValueByName(const string &targetname, const float &defval) const;


  15.     void setValueByName(const string &targetname, const int &val);
  16.     void setValueByName(const string &targetname, const unsigned int &val);
  17.     void setValueByName(const string &targetname, const long &val);
  18.     void setValueByName(const string &targetname, const unsigned long &val);
  19.     void setValueByName(const string &targetname, const short &val);
  20.     void setValueByName(const string &targetname, const unsigned short &val);
  21.     void setValueByName(const string &targetname, const char &val);
  22.     void setValueByName(const string &targetname, const unsigned char &val);
  23.     void setValueByName(const string &targetname, const float &val);
  24.     void setValueByName(const string &targetname, const string &val);
  25.     void setValueByName(const string &targetname, const double &val);

  26. public:

  27.     CEventMessage();
  28.     CEventMessage(const unsigned short& type);
  29.     CEventMessage(const CEventMessage& event);

  30.     CEventMessage& operator=(const CEventMessage& event);

  31.     virtual ~CEventMessage();

  32.     char * getBytes(int &length);

  33.     void setBody(char* body);

  34.     char* getBody() const;

  35.     void setHead(EVENT_HEAD_MESSAGE* head);

  36.     EVENT_HEAD_MESSAGE* getHead() const;

  37.     static void reset(EVENT_HEAD_MESSAGE *event_head_message) {
  38.         event_head_message->pkg_len = 1;
  39.         event_head_message->flag = 0;
  40.         event_head_message->format = 0;
  41.         event_head_message->ver_info = 1;
  42.         event_head_message->peid = 0;
  43.     }

  44.     unsigned int getPrevPEId() const {
  45.         if (head != NULL)
  46.             return head->peid;
  47.         return 0;
  48.     }

  49.     /**
  50.      * ??socket?жs???????
  51.      * @param conn
  52.      * @return
  53.      */

  54. private:

  55.     template<class T>
  56.     const EVENT_TYPE_ENTY * getCheckedEventTypeEntity(const string&name)const;
  57.     template<class T>
  58.     const EVENT_TYPE_ENTY * getCheckedEventTypeEntityAllocBody(const string&name);
  59.    
  60.     void resetFields(const CEventMessage& event);
  61.     string getStringForNum(EVENT_TYPE_ENTY *eventType)const;
  62.     void setStringFormNum(EVENT_TYPE_ENTY *eventType, const string&str_val) const;
  63.     CMetaData * getMetaData()const;    //元数据 就是配置文件 指示body中的数据是如何组成的
  64.     typedef hash_map<string, string> TempValueMap;
  65.     TempValueMap mytempvalmap;//用来存放临时值 用于丰富维度用

  66. public:
  67.     CMetaData * metaData;
  68. private:

  69.     EVENT_HEAD_MESSAGE *head;  //用于网络传输  head
  70.     char *body;                //实际数据
  71. };
复制代码
CEventMessage.cpp太长了就不放上来了 其实很简单  根据metaData找到配置,根据配置在body的指定位置设置值。

您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP