mex 发表于 2015-07-02 09:31

基于Atomic实现的流量控制管理器

代码package flowcontrol;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicFlowCounter {

    private final int maxflow;
    private AtomicInteger currentflow;
    private final String name;

    public AtomicFlowCounter(String name) {
      this(-1, name);
    }

    public AtomicFlowCounter(int maxflow, String name) {
      this.maxflow = maxflow;
      currentflow = new AtomicInteger(0);
      this.name = name;
    }

    public int getMaxflow() {
      return maxflow;
    }

    public int getCurrentflow() {
      return currentflow.get();
    }

    public void setCurrentflow(int currentflow) {
      this.currentflow = new AtomicInteger(currentflow);
    }

    public String getName() {
      return name;
    }

    public boolean incCounter() {
      for (;;) {
            int oldvalue = currentflow.get();
            int current = oldvalue + 1;
            if (current > maxflow) {
                return false;
            }
            if (currentflow.compareAndSet(oldvalue, current)) {
                System.out.println(Thread.currentThread() + "加流量"+current);
                return true;
            }
      }

//       if (currentflow.getAndIncrement() < maxflow) {
//       System.out.println(Thread.currentThread() + "加流量");
//       return true;
//       } else {
//       currentflow.decrementAndGet();
//       }
//       return false;
    }

    public boolean decCounter() {
      for (;;) {
            int oldvalue = currentflow.get();
            int current = oldvalue - 1;
            if (current < 0) {
                return false;
            }
            if (currentflow.compareAndSet(oldvalue, current)) {
                System.out.println(Thread.currentThread() + "减流量"+current);
                return true;
            }
      }
      // if (currentflow.getAndDecrement() > 0) {
      // System.out.println(Thread.currentThread() + "减流量");
      // return true;
      // } else {
      // currentflow.incrementAndGet();
      // }
      // return false;
    }
}代码package flowcontrol;

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class AtomicFlowControlManager {

    private static final ConcurrentMap<String, AtomicFlowCounter> flowMap = new ConcurrentHashMap<String, AtomicFlowCounter>();
    private static final ReentrantLock lock = new ReentrantLock();
    private static final Condition condition = lock.newCondition();
    private static final AtomicInteger count = new AtomicInteger(0);

    public static boolean isExceed(String name) {
      AtomicFlowCounter counter = (AtomicFlowCounter) flowMap.get(name);
      if (counter != null) {
            return !counter.incCounter();
      }
      return true;
    }

    public static boolean releaseCounter(String name) {
      AtomicFlowCounter counter = (AtomicFlowCounter) flowMap.get(name);
      if (counter != null) {
            return counter.decCounter();
      }
      return false;
    }

    public static int getMaxFlow(String name) {
      AtomicFlowCounter counter = (AtomicFlowCounter) flowMap.get(name);
      if (counter != null) {
            return counter.getMaxflow();
      }
      return 0;
    }

    public static void setMaxflow(String name, int maxflow) {
      AtomicFlowCounter counter = null;
      lock.lock();
      try {
            counter = (AtomicFlowCounter) flowMap.get(name);
            if (counter == null) {
                counter = new AtomicFlowCounter(maxflow, name);
                flowMap.put(name, counter);
            }
      } finally {
            lock.unlock();
      }
    }

    public static void main(String[] args) {
      AtomicFlowControlManager.setMaxflow("b2b", 50);
      ExecutorService cacheThreadPool = Executors.newFixedThreadPool(100, new ThreadFactory() {

            private ThreadGroup threadGroup = System.getSecurityManager() == null ? Thread.currentThread()
                  .getThreadGroup() : System.getSecurityManager().getThreadGroup();
            private AtomicLong seq = new AtomicLong(0);
            private String THREADNAME = "AtomicFlowControlManager-Thread-";

            @Override
            public Thread newThread(Runnable r) {

                Thread threadAdapter = new Thread(threadGroup, r, THREADNAME + seq.getAndIncrement(), 0);
                return threadAdapter;
            }

      });
      long t1 = System.currentTimeMillis();
      final CountDownLatch cdl = new CountDownLatch(1000);
      for (int i = 0; i < 1000; i++) {
            cacheThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                  try {
                        // 1000笔业务1s内不定时到来
                        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
                  } catch (InterruptedException e1) {

                        e1.printStackTrace();
                  }
                  boolean isExceed = AtomicFlowControlManager.isExceed("b2b");

                  // 等待0.1s
                  if (isExceed) {
                        System.out.println(Thread.currentThread() + "超出流量等待最多0.1s");
                        lock.lock();
                        try {
                            condition.await(100, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {

                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }
                        isExceed = AtomicFlowControlManager.isExceed("b2b");
                        if (!isExceed) {
                            // 睡眠300ms模拟服务时间
                            try {
                              Thread.sleep(300);
                            } catch (InterruptedException e) {
                              e.printStackTrace();
                            }
                            System.out.println(Thread.currentThread() + "执行服务成功");
                            AtomicFlowControlManager.releaseCounter("b2b");
                            lock.lock();
                            try {
                              condition.signal();
                            } finally {
                              lock.unlock();
                            }
                            count.incrementAndGet();
                        }
                  } else {
                        // 睡眠200ms模拟服务时间
                        long t1 = System.currentTimeMillis();
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        long t2 = System.currentTimeMillis();
                        System.out.println(Thread.currentThread() + "执行服务成功耗时[" + (t2 - t1) + "]");
                        AtomicFlowControlManager.releaseCounter("b2b");
                        lock.lock();
                        try {
                            condition.signal();
                        } finally {
                            lock.unlock();
                        }
                        count.incrementAndGet();
                  }
                  cdl.countDown();
                }
            });
      }
      try {
            cdl.await();
      } catch (InterruptedException e) {

            e.printStackTrace();
      }
      cacheThreadPool.shutdown();
      long t2 = System.currentTimeMillis();
      System.out.println("耗时: " + (t2 - t1) + " 业务成功笔数: " + count.get() + "" + (count.get() * 1000 / (t2 - t1))
                + " 笔/秒" + "业务处理成功率: " + (count.get() * 100 / 1000.0D) + "%");

    }
}
页: [1]
查看完整版本: 基于Atomic实现的流量控制管理器