免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 1974 | 回复: 0
打印 上一主题 下一主题

基于Atomic实现的流量控制管理器 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2015-07-02 09:31 |只看该作者 |倒序浏览
[Java]代码
  1. package flowcontrol;

  2. import java.util.concurrent.atomic.AtomicInteger;

  3. public class AtomicFlowCounter {

  4.     private final int maxflow;
  5.     private AtomicInteger currentflow;
  6.     private final String name;

  7.     public AtomicFlowCounter(String name) {
  8.         this(-1, name);
  9.     }

  10.     public AtomicFlowCounter(int maxflow, String name) {
  11.         this.maxflow = maxflow;
  12.         currentflow = new AtomicInteger(0);
  13.         this.name = name;
  14.     }

  15.     public int getMaxflow() {
  16.         return maxflow;
  17.     }

  18.     public int getCurrentflow() {
  19.         return currentflow.get();
  20.     }

  21.     public void setCurrentflow(int currentflow) {
  22.         this.currentflow = new AtomicInteger(currentflow);
  23.     }

  24.     public String getName() {
  25.         return name;
  26.     }

  27.     public boolean incCounter() {
  28.         for (;;) {
  29.             int oldvalue = currentflow.get();
  30.             int current = oldvalue + 1;
  31.             if (current > maxflow) {
  32.                 return false;
  33.             }
  34.             if (currentflow.compareAndSet(oldvalue, current)) {
  35.                 System.out.println(Thread.currentThread() + "加流量"+current);
  36.                 return true;
  37.             }
  38.         }

  39. //       if (currentflow.getAndIncrement() < maxflow) {
  40. //       System.out.println(Thread.currentThread() + "加流量");
  41. //       return true;
  42. //       } else {
  43. //       currentflow.decrementAndGet();
  44. //       }
  45. //       return false;
  46.     }

  47.     public boolean decCounter() {
  48.         for (;;) {
  49.             int oldvalue = currentflow.get();
  50.             int current = oldvalue - 1;
  51.             if (current < 0) {
  52.                 return false;
  53.             }
  54.             if (currentflow.compareAndSet(oldvalue, current)) {
  55.                 System.out.println(Thread.currentThread() + "减流量"+current);
  56.                 return true;
  57.             }
  58.         }
  59.         // if (currentflow.getAndDecrement() > 0) {
  60.         // System.out.println(Thread.currentThread() + "减流量");
  61.         // return true;
  62.         // } else {
  63.         // currentflow.incrementAndGet();
  64.         // }
  65.         // return false;
  66.     }
  67. }
复制代码
[Java]代码
  1. package flowcontrol;

  2. import java.util.Random;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. import java.util.concurrent.ConcurrentMap;
  5. import java.util.concurrent.CountDownLatch;
  6. import java.util.concurrent.ExecutorService;
  7. import java.util.concurrent.Executors;
  8. import java.util.concurrent.ThreadFactory;
  9. import java.util.concurrent.TimeUnit;
  10. import java.util.concurrent.atomic.AtomicInteger;
  11. import java.util.concurrent.atomic.AtomicLong;
  12. import java.util.concurrent.locks.Condition;
  13. import java.util.concurrent.locks.ReentrantLock;

  14. public class AtomicFlowControlManager {

  15.     private static final ConcurrentMap<String, AtomicFlowCounter> flowMap = new ConcurrentHashMap<String, AtomicFlowCounter>();
  16.     private static final ReentrantLock lock = new ReentrantLock();
  17.     private static final Condition condition = lock.newCondition();
  18.     private static final AtomicInteger count = new AtomicInteger(0);

  19.     public static boolean isExceed(String name) {
  20.         AtomicFlowCounter counter = (AtomicFlowCounter) flowMap.get(name);
  21.         if (counter != null) {
  22.             return !counter.incCounter();
  23.         }
  24.         return true;
  25.     }

  26.     public static boolean releaseCounter(String name) {
  27.         AtomicFlowCounter counter = (AtomicFlowCounter) flowMap.get(name);
  28.         if (counter != null) {
  29.             return counter.decCounter();
  30.         }
  31.         return false;
  32.     }

  33.     public static int getMaxFlow(String name) {
  34.         AtomicFlowCounter counter = (AtomicFlowCounter) flowMap.get(name);
  35.         if (counter != null) {
  36.             return counter.getMaxflow();
  37.         }
  38.         return 0;
  39.     }

  40.     public static void setMaxflow(String name, int maxflow) {
  41.         AtomicFlowCounter counter = null;
  42.         lock.lock();
  43.         try {
  44.             counter = (AtomicFlowCounter) flowMap.get(name);
  45.             if (counter == null) {
  46.                 counter = new AtomicFlowCounter(maxflow, name);
  47.                 flowMap.put(name, counter);
  48.             }
  49.         } finally {
  50.             lock.unlock();
  51.         }
  52.     }

  53.     public static void main(String[] args) {
  54.         AtomicFlowControlManager.setMaxflow("b2b", 50);
  55.         ExecutorService cacheThreadPool = Executors.newFixedThreadPool(100, new ThreadFactory() {

  56.             private ThreadGroup threadGroup = System.getSecurityManager() == null ? Thread.currentThread()
  57.                     .getThreadGroup() : System.getSecurityManager().getThreadGroup();
  58.             private AtomicLong seq = new AtomicLong(0);
  59.             private String THREADNAME = "AtomicFlowControlManager-Thread-";

  60.             @Override
  61.             public Thread newThread(Runnable r) {

  62.                 Thread threadAdapter = new Thread(threadGroup, r, THREADNAME + seq.getAndIncrement(), 0);
  63.                 return threadAdapter;
  64.             }

  65.         });
  66.         long t1 = System.currentTimeMillis();
  67.         final CountDownLatch cdl = new CountDownLatch(1000);
  68.         for (int i = 0; i < 1000; i++) {
  69.             cacheThreadPool.execute(new Runnable() {

  70.                 @Override
  71.                 public void run() {
  72.                     try {
  73.                         // 1000笔业务1s内不定时到来
  74.                         TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
  75.                     } catch (InterruptedException e1) {

  76.                         e1.printStackTrace();
  77.                     }
  78.                     boolean isExceed = AtomicFlowControlManager.isExceed("b2b");

  79.                     // 等待0.1s
  80.                     if (isExceed) {
  81.                         System.out.println(Thread.currentThread() + "超出流量等待最多0.1s");
  82.                         lock.lock();
  83.                         try {
  84.                             condition.await(100, TimeUnit.MILLISECONDS);
  85.                         } catch (InterruptedException e) {

  86.                             e.printStackTrace();
  87.                         } finally {
  88.                             lock.unlock();
  89.                         }
  90.                         isExceed = AtomicFlowControlManager.isExceed("b2b");
  91.                         if (!isExceed) {
  92.                             // 睡眠300ms模拟服务时间
  93.                             try {
  94.                                 Thread.sleep(300);
  95.                             } catch (InterruptedException e) {
  96.                                 e.printStackTrace();
  97.                             }
  98.                             System.out.println(Thread.currentThread() + "执行服务成功");
  99.                             AtomicFlowControlManager.releaseCounter("b2b");
  100.                             lock.lock();
  101.                             try {
  102.                                 condition.signal();
  103.                             } finally {
  104.                                 lock.unlock();
  105.                             }
  106.                             count.incrementAndGet();
  107.                         }
  108.                     } else {
  109.                         // 睡眠200ms模拟服务时间
  110.                         long t1 = System.currentTimeMillis();
  111.                         try {
  112.                             Thread.sleep(200);
  113.                         } catch (InterruptedException e) {
  114.                             e.printStackTrace();
  115.                         }
  116.                         long t2 = System.currentTimeMillis();
  117.                         System.out.println(Thread.currentThread() + "执行服务成功耗时[" + (t2 - t1) + "]");
  118.                         AtomicFlowControlManager.releaseCounter("b2b");
  119.                         lock.lock();
  120.                         try {
  121.                             condition.signal();
  122.                         } finally {
  123.                             lock.unlock();
  124.                         }
  125.                         count.incrementAndGet();
  126.                     }
  127.                     cdl.countDown();
  128.                 }
  129.             });
  130.         }
  131.         try {
  132.             cdl.await();
  133.         } catch (InterruptedException e) {

  134.             e.printStackTrace();
  135.         }
  136.         cacheThreadPool.shutdown();
  137.         long t2 = System.currentTimeMillis();
  138.         System.out.println("耗时: " + (t2 - t1) + " 业务成功笔数: " + count.get() + "  " + (count.get() * 1000 / (t2 - t1))
  139.                 + " 笔/秒" + "  业务处理成功率: " + (count.get() * 100 / 1000.0D) + "%");

  140.     }
  141. }
复制代码
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP