免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
楼主: lost_templar

[C++] 谁来来帮我看看这个简单的线程池(c++11) [复制链接]

论坛徽章:
43
15-16赛季CBA联赛之四川
日期:2018-10-13 23:26:5015-16赛季CBA联赛之新疆
日期:2016-04-25 10:55:452016科比退役纪念章
日期:2016-04-23 00:51:2315-16赛季CBA联赛之山东
日期:2016-04-17 12:00:2815-16赛季CBA联赛之福建
日期:2016-04-12 15:21:2915-16赛季CBA联赛之辽宁
日期:2016-03-24 21:38:2715-16赛季CBA联赛之福建
日期:2016-03-18 12:13:4015-16赛季CBA联赛之佛山
日期:2016-02-05 00:55:2015-16赛季CBA联赛之佛山
日期:2016-02-04 21:11:36程序设计版块每日发帖之星
日期:2016-07-02 06:20:0015-16赛季CBA联赛之天津
日期:2016-11-02 00:33:1215-16赛季CBA联赛之浙江
日期:2017-01-13 01:31:49
发表于 2014-04-30 21:47 |显示全部楼层
回复 28# lost_templar

1、如果队列里面还有内容,pop不会返回false,一直到队列已经被close而且里面为空时pop才会返回false,这样可以保证不会有遗留任务
2、原因很简单,mutex和condition_variable都不能copy construct/assignment,所以concurrent_queue当然也不能copy construct/assignment。

论坛徽章:
3
2015年辞旧岁徽章
日期:2015-03-03 16:54:152015年迎新春徽章
日期:2015-03-04 09:58:11数据库技术版块每日发帖之星
日期:2015-08-30 06:20:00
发表于 2014-05-03 00:01 |显示全部楼层
本帖最后由 蔡万钊 于 2014-05-03 00:04 编辑

这代码第一眼就不想看下去了. 

烂代码就别修改了,直接扔掉. 烂代码再怎么修改都是在 随机编程.

线程池? 这 TM 的多蛋疼. http://avboost.com/t/boost-asio/492

论坛徽章:
3
15-16赛季CBA联赛之山东
日期:2016-10-30 08:47:3015-16赛季CBA联赛之佛山
日期:2016-12-17 00:06:31CU十四周年纪念徽章
日期:2017-12-03 01:04:02
发表于 2014-05-03 11:13 |显示全部楼层

看到这种 C++ 烂代码就想吐, 线程池个毛啊! 还用 C++11 里面的东西, 这特么是在玩 C++ 还是在玩线程? 玩儿蛋去吧! 有把 fork 和 pthread, 进程信号量, 线程互斥锁, 读写锁还有条件锁搞清楚了先? 艹蛋.


论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
发表于 2014-05-04 08:23 |显示全部楼层
回复 31# 蔡万钊
其实没有指望你能读懂,只会叫骂的菜鸟~~


   

论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
发表于 2014-05-04 08:24 |显示全部楼层
captivated 发表于 2014-05-03 11:13
看到这种 C++ 烂代码就想吐, 线程池个毛啊! 还用 C++11 里面的东西, 这特么是在玩 C++ 还是在玩线程? 玩儿 ...


把你的蛋扯下来玩我没有意见

论坛徽章:
3
2015年辞旧岁徽章
日期:2015-03-03 16:54:152015年迎新春徽章
日期:2015-03-04 09:58:11数据库技术版块每日发帖之星
日期:2015-08-30 06:20:00
发表于 2014-05-05 12:34 |显示全部楼层
... ... 囧, tm 被耍了. 5555555 还以为他是来虚心求救的

论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
发表于 2014-05-08 20:06 |显示全部楼层
回复 35# 蔡万钊
你用这种态度回复,自然不是期待别人给你好脸色的了。
既然你的愿望已经满足了,还在这里唧唧歪歪个屁啊


   

论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
发表于 2014-05-08 23:01 |显示全部楼层
既然楼主@了我的某个马甲,就回一下吧 ---

首先
~simple_thread_pool()

~thread_join_guard()
先执行,这是C++语言决定的。
楼主没注意到这点。

~simple_thread_pool() 中设定 all_done_flag = true; 后,
就会导致  work_loader() 不等所有事情做完,提前退出了。

论坛徽章:
3
2015年辞旧岁徽章
日期:2015-03-03 16:54:152015年迎新春徽章
日期:2015-03-04 09:58:11数据库技术版块每日发帖之星
日期:2015-08-30 06:20:00
发表于 2014-05-11 16:51 |显示全部楼层
回复 37# 群雄逐鹿中原


   
为啥这个头像没有被和谐啊!

论坛徽章:
9
程序设计版块每日发帖之星
日期:2015-10-18 06:20:00程序设计版块每日发帖之星
日期:2015-11-01 06:20:00程序设计版块每日发帖之星
日期:2015-11-02 06:20:00每日论坛发贴之星
日期:2015-11-02 06:20:00程序设计版块每日发帖之星
日期:2015-11-03 06:20:00程序设计版块每日发帖之星
日期:2015-11-04 06:20:00程序设计版块每日发帖之星
日期:2015-11-06 06:20:00数据库技术版块每周发帖之星
日期:2015-12-02 15:02:47数据库技术版块每日发帖之星
日期:2015-12-08 06:20:00
发表于 2015-10-06 15:56 |显示全部楼层
推荐一个:
基于C++11的线程池
时间 2014-05-06 16:09:10 CSDN博客
原文  blog.csdn.net/love_newzai/article/details/25135533
主题 线程池 C++

    1.封装的线程对象

class task : public std::tr1::enable_shared_from_this<task>
{
public:
  task():exit_(false){}
  task( const task & ) = delete;
  ~task(){}
  task & operator =( const task & = delete;

  void start();
  void stop()
  {
    exit_ = true;
    sync_.notify_one();
  }
  void set_job( const std::function<void()> & job, const std::string & file, int line)
  {//提交任务
    {
      std::unique_lock<std::mutex> lock(mutex_);
      job_ = job;
      file_ = file;
      line_ = line;
    }
    sync_.notify_one();//通知主线程有任务要执行....
  }
  void print_job(){
    LOG(INFO)<<"sumbit from:"<<file_<<":"<<line_;
  }
private:

  bool exit_;
  std::mutex mutex_;
  std::condition_variable sync_;
  std::function< void()> job_;          //线程执行的任务,线程任意时刻,最多只能执行一个任务。
  std::thread::id       id_;
  std::string                     file_;
  int                   line_;


};

void task::start()
{
  auto job_proxy = [this] (){


    id_ = std::this_thread::get_id();


    while( !exit_ )
    {


      std::unique_lock<std::mutex> lock(mutex_);
      
      if( job_ )
      {//有任务了,需要执行任务了
        try
        {
          job_(); //执行任务的代码
        }catch( std::exception & e)
        {
         
        }catch(...)
        {
         
        }
        job_ = std::function<void()>(); //释放任务绑定的资源,主要为闭包捕获的资源,特别是shared_ptr对象.
        tasks->job_completed( shared_from_this() ); //任务执行完成,通知线程池
      }else{
             //没有任务的时候,等待其他线程提交任务。。
        sync_.wait(lock);

      }
    }
  };


  std::thread t(job_proxy); //创建并启动与task管理的线程
  t.detach();               //分离模式,thread对象销毁了,但是其创建的线程还活着。。。
}


    2.线程池管理对象

class task_pool
{
public:
  task_pool(unsigned int pool_size = 12:max_size_(pool_size),stop_all_(true)
  {

  }
  ~task_pool()
  {
  }
  void job_completed( const std::tr1::shared_ptr<task> & t)//回收task对象
  {

    std::lock_guard<std::mutex> lock(mutex_);
    bool need_to_notify = idle_tasks_.empty() && (!wait_for_running_jobs_.empty());
    busying_tasks_.erase(t);
    idle_tasks_.push_back(t);
    LOG(INFO)<<"after job_completed, current idle tasks size:"<< idle_tasks_.size()
      <<" busying tasks size:"<<busying_tasks_.size()
      <<" wait for running jobs size:"<<wait_for_running_jobs_.size();
    if( !busying_tasks_.empty() ){
      (*busying_tasks_.begin())->print_job();
    }
    if( need_to_notify )//任务太多了,之前空闲线程使用完了,有任务在等待执行,需要通知
    {
      sync_.notify_one();
    }
  };
  //提交任务
  void submit_job( const std::function<void()> & job, const std::string file, int line)
  {
    if( stop_all_ )
    {
      return;
    }
    std::lock_guard<std::mutex> lock(mutex_);
    bool need_notify = wait_for_running_jobs_.empty();
    wait_for_running_jobs_.push(std::make_tuple(job,file,line));

    if( need_notify )//等待执行的任务为空时,需要通知,其他情况不需要通知.
    {
      sync_.notify_one();
    }
   
  }
  void execute_job()
  {


    while(true)
    {
      std::unique_lock<std::mutex> lock(mutex_);
      while(!stop_all_ && wait_for_running_jobs_.empty() )
      {
        //等待其他线程提交任务
        sync_.wait(lock);
      }

      if( stop_all_ )
      {
        return;
      }
      while(!stop_all_ && idle_tasks_.empty())
      {
        //有任务要执行,但是没有空闲线程,等待其他任务执行完成。
        sync_.wait(lock);
      }
      if( stop_all_ )
      {
        return;
      }

      //有任务,也有空闲线程了
      auto t = get_task();
      auto job =wait_for_running_jobs_.front();
      wait_for_running_jobs_.pop();
      //分发任务到task 线程.
      t->set_job(std::get<0>(job), std::get<1>(job), std::get<2>(job));
    }
  }
  void stop_all()
  {

    std::lock_guard<std::mutex> lock(mutex_);
    stop_all_ = true;
    for( auto t : idle_tasks_ )
    {
      t->stop();
    }
    idle_tasks_.clear();
    for( auto t : busying_tasks_ )
    {
      t->stop();
    }
    while(!wait_for_running_jobs_.empty()){
      wait_for_running_jobs_.pop();
    }

    sync_.notify_one();
  }

  void start()
  {// 初始化启动线程池主线程
    try
    {
      std::thread t( [this]{ execute_job();});
      t.detach();

      stop_all_ = false;
      allocate_tasks();

    }catch( std::exception & e )
    {
      LOG(FATAL) << "start tasks pool ... error"<<e.what();
    }
  }
protected:
  std::tr1::shared_ptr<task> get_task()
  {
    //获取task对象
    if( ! idle_tasks_.empty() )
    {
      auto t = *idle_tasks_.begin();
      idle_tasks_.pop_front();  //从空闲队列移除
      busying_tasks_.insert(t); //加入忙队列
      
      return t;
    }

    return std::tr1::shared_ptr<task>();

  }

  void allocate_tasks() //初始化线程池
  {
    for( int i = 0 ; i < max_size_; i ++ )
    {
      std::tr1::shared_ptr<task> t( new task());
      try{
        t->start();
        idle_tasks_.push_back(t);
      }catch( std::exception & e)
      {        //超过进程最大线程数限制时,会跑出异常。。。               
        break;
      }
    }
  }
private :
  unsigned int                              max_size_;
  std::list  < std::tr1::shared_ptr<task> > idle_tasks_;   //空闲任务队列
  std::set  <  std::tr1::shared_ptr<task> > busying_tasks_;//正在执行任务的队列
  std::queue< std::tuple< std::function<void()> , std::string, int   >  > wait_for_running_jobs_; //等待执行的任务
  std::mutex                                                 mutex_;
  std::condition_variable                 sync_;
  bool stop_all_;
};

    usage

static task_pool *  tasks = nullptr;
static std:nce_flag init_flag;
static std:nce_flag finit_flag;

void run_job(const std::function<void()> & job , const std::string &  file, int line )
{
  if( tasks != nullptr)
    tasks->submit_job(job, file,line);

}
void task_pool_init( unsigned max_task_size)
{
  std::call_once(init_flag,[max_task_size]{
    tasks = new task_pool(max_task_size);
    tasks->start();
  });
}
void task_pool_finit()
{
   std::call_once(finit_flag,[]{ tasks->stop_all();});
}

您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

SACC2019中国系统架构师大会

【数字转型 架构演进】SACC2019中国系统架构师大会,8.5折限时优惠重磅来袭!
2019年10月31日~11月2日第11届中国系统架构师大会(SACC2019)将在北京隆重召开。四大主线并行的演讲模式,1个主会场、20个技术专场、超千人参与的会议规模,100+来自互联网、金融、制造业、电商等领域的嘉宾阵容,将为广大参会者提供一场最具价值的技术交流盛会。

限时8.5折扣期:2019年9月30日前


----------------------------------------

大会官网>>
  

北京盛拓优讯信息技术有限公司. 版权所有 16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122
中国互联网协会会员  联系我们:huangweiwei@it168.com
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP