免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 2761 | 回复: 1

java并发编程--线程池初步 [复制链接]

论坛徽章:
0
发表于 2011-11-06 17:11 |显示全部楼层
java并发编程--线程池初步






[coolxing按: 转载请注明作者和出处, 如有谬误, 欢迎在评论中指正.]



服务器应用程序经常需要处理执行时间很短而数目巨大的请求, 如果为每一个请求创建一个新的线程, 会导致一些问题的出现, 如:

1. 性能瓶颈. 线程的创建和销毁需要执行大量的后台操作, 如果单个请求的执行时间很短, 有可能花在创建和销毁线程上的时间大于真正执行请求的时间.

2. 可能会导致资源不足. 大量的并发请求意味着需要创建大量的线程, 过多的线程存在会吞噬大量的系统资源, 而且CPU需要在这些线程间不断切换, 这可能引发"切换过度"的问题.

为了适应上述场合, java在JDK1.5中引入了线程池的概念. 线程池中存放着一定数量的已创建好的线程, 当一个请求到来时, 只需从线程池中取出一个线程来执行请求, 请求完成后再将线程归还给线程池. 同时, 我们可以为线程池指定最大的线程数量, 当池中所有线程都处于活动状态下, 新的任务会排队等候, 直到之前的某个任务处理完成后, 新的任务才能得到处理.  



创建线程池. java.util.concurrent.Executors类提供了多个静态方法用于创建线程池.

|--public static ExecutorService newFixedThreadPool(int nThreads): 创建一个可重用的固定线程数的线程池. 如果池中所有的nThreads个线程都处于活动状态时提交任务(任务通常是Runnable或Callable对象), 任务将在队列中等待, 直到池中出现可用线程.

|--public static ExecutorService newCachedThreadPool(): 调用此方法创建的线程池可根据需要自动调整池中线程的数量. 执行任务时将重用存在先前创建的线程(如果池中存在可用线程的话). 如果池中没有可用线程, 将创建一个新的线程, 并将其添加到池中. 池中的线程超过60秒未被使用就会被销毁, 因此长时间保持空闲的CachedThreadPool不会消耗额外的资源.

|--public static ExecutorService newSingleThreadExecutor(): 创建一个单线程的Executor. 这个Executor保证按照任务提交的顺序依次执行任务.

|--public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize): 创建一个可重用的固定线程数的线程池. ScheduledExecutorService是ExecutorService的子接口, 调用ScheduledExecutorService的相关方法, 可以延迟或定期执行任务.

以上静态方法均使用默认的ThreadFactory(即Executors.defaultThreadFactory()方法的返回值)创建线程, 如果想要指定ThreadFactory, 可调用他们的重载方法.通过指定ThreadFactory, 可以定制新建线程的名称, 线程组, 优先级, 守护线程状态等.

如果Executors提供的创建线程池的方法无法满足要求, 可以使用ThreadPoolExecutor类创建线程池.



提交任务. 所有的线程池都是ExecutorService及其子类的对象, 因此, 可以调用ExecutorService的相关方法提交任务.

|--void execute(Runnable command): 使用池中已存在的线程或新建一个线程执行command.



Java代码
  1. 1.public class ExecutorsDemo {   
  2. 2.      
  3. 3.    public static void main(String[] args) throws Exception {   
  4. 4.        System.out.println("----------------FixedThreadPool---------------------");   
  5. 5.        ExecutorService fixedPool = getFixedThreadPool();   
  6. 6.        executeThread(fixedPool);   
  7. 7.           
  8. 8.        // 为了避免混淆, 需要等待executeThread(fixedPool)执行完成   
  9. 9.        Thread.sleep(3000);   
  10. 10.           
  11. 11.        System.out.println("----------------CashedThreadPool---------------------");   
  12. 12.        ExecutorService cashedPool = getCashedThreadPool();   
  13. 13.        executeThread(cashedPool);   
  14. 14.           
  15. 15.        // 为了避免混淆, 需要等待executeThread(cashedPool)执行完成   
  16. 16.        Thread.sleep(3000);   
  17. 17.           
  18. 18.        System.out.println("----------------SingleThreadExecutor---------------------");   
  19. 19.        ExecutorService singleExecutor = getSingleThreadExecutor();   
  20. 20.        executeThread(singleExecutor);   
  21. 21.           
  22. 22.    }   
  23. 23.      
  24. 24.    // 只存在一个线程的线程池   
  25. 25.    private static ExecutorService getSingleThreadExecutor() {   
  26. 26.        return Executors.newSingleThreadExecutor();   
  27. 27.    }   
  28. 28.  
  29. 29.    // 创建一个根据需要自动调整大小的线程池   
  30. 30.    private static ExecutorService getCashedThreadPool() {   
  31. 31.        return Executors.newCachedThreadPool();   
  32. 32.    }   
  33. 33.  
  34. 34.    // 创建一个可重用的固定线程数的线程池   
  35. 35.    private static ExecutorService getFixedThreadPool() {   
  36. 36.        return Executors.newFixedThreadPool(2);   
  37. 37.    }   
  38. 38.      
  39. 39.    private static void executeThread(ExecutorService pool) {   
  40. 40.        Thread t1 = new MyThread();   
  41. 41.        Thread t2 = new MyThread();   
  42. 42.        Thread t3 = new MyThread();   
  43. 43.        Thread t4 = new MyThread();   
  44. 44.        // 将Tread放入线程池中执行   
  45. 45.        // MyThread类继承自Thread, 而Thread类实现了Runnable接口   
  46. 46.        pool.execute(t1);   
  47. 47.        pool.execute(t2);   
  48. 48.        pool.execute(t3);   
  49. 49.        pool.execute(t4);   
  50. 50.        // 关闭线程池   
  51. 51.        pool.shutdown();   
  52. 52.    }   
  53. 53.      
  54. 54.    private static final class MyThread extends Thread {   
  55. 55.        @Override  
  56. 56.        public void run() {   
  57. 57.            super.run();   
  58. 58.            System.out.println(Thread.currentThread().getName() + ": is running!");   
  59. 59.        }   
  60. 60.    }   
  61. 61.}  
  62. public class ExecutorsDemo {
  63.        
  64.         public static void main(String[] args) throws Exception {
  65.                 System.out.println("----------------FixedThreadPool---------------------");
  66.                 ExecutorService fixedPool = getFixedThreadPool();
  67.                 executeThread(fixedPool);
  68.                
  69.                 // 为了避免混淆, 需要等待executeThread(fixedPool)执行完成
  70.                 Thread.sleep(3000);
  71.                
  72.                 System.out.println("----------------CashedThreadPool---------------------");
  73.                 ExecutorService cashedPool = getCashedThreadPool();
  74.                 executeThread(cashedPool);
  75.                
  76.                 // 为了避免混淆, 需要等待executeThread(cashedPool)执行完成
  77.                 Thread.sleep(3000);
  78.                
  79.                 System.out.println("----------------SingleThreadExecutor---------------------");
  80.                 ExecutorService singleExecutor = getSingleThreadExecutor();
  81.                 executeThread(singleExecutor);
  82.                
  83.         }
  84.        
  85.         // 只存在一个线程的线程池
  86.         private static ExecutorService getSingleThreadExecutor() {
  87.                 return Executors.newSingleThreadExecutor();
  88.         }

  89.         // 创建一个根据需要自动调整大小的线程池
  90.         private static ExecutorService getCashedThreadPool() {
  91.                 return Executors.newCachedThreadPool();
  92.         }

  93.         // 创建一个可重用的固定线程数的线程池
  94.         private static ExecutorService getFixedThreadPool() {
  95.                 return Executors.newFixedThreadPool(2);
  96.         }
  97.        
  98.         private static void executeThread(ExecutorService pool) {
  99.                 Thread t1 = new MyThread();
  100.                 Thread t2 = new MyThread();
  101.                 Thread t3 = new MyThread();
  102.                 Thread t4 = new MyThread();
  103.                 // 将Tread放入线程池中执行
  104.                 // MyThread类继承自Thread, 而Thread类实现了Runnable接口
  105.                 pool.execute(t1);
  106.                 pool.execute(t2);
  107.                 pool.execute(t3);
  108.                 pool.execute(t4);
  109.                 // 关闭线程池
  110.                 pool.shutdown();
  111.         }
  112.        
  113.         private static final class MyThread extends Thread {
  114.                 @Override
  115.                 public void run() {
  116.                         super.run();
  117.                         System.out.println(Thread.currentThread().getName() + ": is running!");
  118.                 }
  119.         }
  120. }
复制代码
程序的输出为:

  1. ----------------FixedThreadPool---------------------

  2. pool-1-thread-1: is running!

  3. pool-1-thread-2: is running!

  4. pool-1-thread-2: is running!

  5. pool-1-thread-1: is running!

  6. ----------------CashedThreadPool---------------------

  7. pool-2-thread-1: is running!

  8. pool-2-thread-2: is running!

  9. pool-2-thread-4: is running!

  10. pool-2-thread-3: is running!

  11. ----------------SingleThreadExecutor---------------------

  12. pool-3-thread-1: is running!

  13. pool-3-thread-1: is running!

  14. pool-3-thread-1: is running!

  15. pool-3-thread-1: is running!
复制代码
|--Future<T> submit(Callable<T> task): 使用池中已存在的线程或新建一个线程执行task, 与execute()方法不同的是, 该方法会返回线程的执行结果. submit方法接受一个Callable<T>对象, Callable<T>接口是一个泛型接口, 实现Callable<T>接口需要重写其中的call()方法, call()方法将返回一个T对象. submit方法的返回值是Future<T>对象, 调用该对象的get()可以获得call()方法的返回值.



Java代码
  1. 1.public class FutureDemo {   
  2. 2.    public static void main(String[] args) throws Exception {   
  3. 3.        ExecutorService pool = Executors.newFixedThreadPool(2);   
  4. 4.           
  5. 5.        Future<Integer> intFuture = pool.submit(new IntegerCallable());   
  6. 6.                // get()方法将阻塞主线程, 直到IntegerCallable线程的call()运行结束并返回结果时为止.   
  7. 7.        Integer returnInt = intFuture.get();   
  8. 8.        System.out.println("返回值为" + returnInt);   
  9. 9.           
  10. 10.        Future<Boolean> boolFuture = pool.submit(new BooleanCallable());   
  11. 11.        Boolean returnBool = boolFuture.get();   
  12. 12.        System.out.println("返回值为" + returnBool);   
  13. 13.           
  14. 14.        pool.shutdown();   
  15. 15.    }   
  16. 16.      
  17. 17.    private final static class IntegerCallable implements Callable<Integer> {   
  18. 18.        // call()方法的返回值类型由泛型决定     
  19. 19.        @Override  
  20. 20.        public Integer call() throws Exception {   
  21. 21.            return 2;   
  22. 22.        }   
  23. 23.    }   
  24. 24.      
  25. 25.    private final static class BooleanCallable implements Callable<Boolean> {   
  26. 26.        @Override  
  27. 27.        public Boolean call() throws Exception {   
  28. 28.            return true;   
  29. 29.        }   
  30. 30.    }   
  31. 31.}  
  32. public class FutureDemo {
  33.         public static void main(String[] args) throws Exception {
  34.                 ExecutorService pool = Executors.newFixedThreadPool(2);
  35.                
  36.                 Future<Integer> intFuture = pool.submit(new IntegerCallable());
  37.                 // get()方法将阻塞主线程, 直到IntegerCallable线程的call()运行结束并返回结果时为止.
  38.                 Integer returnInt = intFuture.get();
  39.                 System.out.println("返回值为" + returnInt);
  40.                
  41.                 Future<Boolean> boolFuture = pool.submit(new BooleanCallable());
  42.                 Boolean returnBool = boolFuture.get();
  43.                 System.out.println("返回值为" + returnBool);
  44.                
  45.                 pool.shutdown();
  46.         }
  47.        
  48.         private final static class IntegerCallable implements Callable<Integer> {
  49.                 // call()方法的返回值类型由泛型决定  
  50.                 @Override
  51.                 public Integer call() throws Exception {
  52.                         return 2;
  53.                 }
  54.         }
  55.        
  56.         private final static class BooleanCallable implements Callable<Boolean> {
  57.                 @Override
  58.                 public Boolean call() throws Exception {
  59.                         return true;
  60.                 }
  61.         }
  62. }  
复制代码
程序的输出结果为:

返回值为2

返回值为true

|--List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks): 批量执行多个任务.



Future类. 如果需要获取线程的执行结果, 那么就会使用到Future. Future对象是一个指向异步执行结果的引用, 由于线程的异步特性, Future对象在其创建之初可能并不可用, 比如线程的call()方法尚未完成时. 可以调用Future对象的isDone()方法判断线程结果是否已经可用, 在线程结果返回之前调用Future对象的get()方法, 将导致阻塞.



关闭线程池. 使用完线程池后需要关闭它, 否则程序可能一直处于运行状态. ExecutorService提供了2个方法用于关闭线程池:

|--void shutdown(): 关闭线程池, 不再接受新任务. 如果存在正在执行的任务, 则等待任务执行完成.

|--List<Runnable> shutdownNow(): 关闭线程池, 不再接受新任务. 尽力尝试停止正在执行的任务, 并返回正在等待的任务列表.

|--boolean isShutdown(): 判断线程池是否已经关闭.

论坛徽章:
0
发表于 2011-11-19 11:12 |显示全部楼层
IT168文库精选文档推荐
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

DTCC2020中国数据库技术大会

【架构革新 高效可控】2020年12月21日-23日第十一届中国数据库技术大会将在北京隆重召开。

大会设置2大主会场,20+技术专场,将邀请超百位行业专家,重点围绕数据架构、AI与大数据、传统企业数据库实践和国产开源数据库等内容展开分享和探讨,为广大数据领域从业人士提供一场年度盛会和交流平台。

http://dtcc.it168.com


大会官网>>
  

北京盛拓优讯信息技术有限公司. 版权所有 16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122
中国互联网协会会员  联系我们:huangweiwei@it168.com
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP