限制Java线程池运行线程以及等待线程数量的策略

441 查看

限制Java线程池运行线程以及等待线程数量的策略

对于java.util.concurrent.Executors所提供的FixedThreadPool,可以保证可以在内存中有固定数量的线程数运行。但是由于FixedThreadPool绑定的是LinkedBlockingQueue。队列的上限没有限制(默认上限为Integer.MAX_VALUE),不断的提交新的线程,会造成任务在内存中长时间的堆积。

我们有可能面临如下的场景,主线程不断地提交任务线程,希望有固定数量的在线程中运行,也不想造成线程在内存中大量的等待堆积。由此需要我们自己定义一个线程池策略。ThreadPoolExecutor为我们线程池的设置提供了很大的灵活性。

首先看FixedThreadPool的实现:

    public static ExecutorService More ...newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }

可以看到,FixedThreadPool绑定的是LinkedBlockingQueue<Runnable>。我们需要做的第一个改造就是绑定有大小上线的BlockingQueue,在我的实现中绑定ArrayBlockingQueue<Runnable>并设置了size。

第二个是采用CallerRunsPolicy。ThreadPoolExecutor可以定义不同的任务拒绝策略。CallerRunsPolicy指的是当线程池拒绝该任务的时候,线程在本地线程直接execute。这样就限制了本地线程的循环提交流程。

    BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<Runnable>(10);
    RejectedExecutionHandler rejectedExecutionHandler =
        new ThreadPoolExecutor.CallerRunsPolicy();
    ExecutorService threadPool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
        workingQueue, rejectedExecutionHandler);

    for (int i = 0; i < 100; i++) {
      
      threadPool.submit(new Callable<Boolean>() {

        @Override
        public Boolean call() throws Exception {
          System.out.println("thread " + String.valueOf(threadNo) + " is called");
          Thread.sleep(10000);
          System.out.println("thread " + String.valueOf(threadNo) + " is awake");
          throw new Exception();
        }

      });
    }

代码中定义了大小为10的线程池,for循环提交了20个线程的时候,10个执行线程,10个线程放入了workingQueue。当提交到第21个线程的时候,会触发RejectedExecutionHandler。在这里我们配置了CallerRunsPolicy策略。所以会在主线程直接执行该线程。也就是说,在本程序中最多会有11个线程在执行,10个线程在等待。由此限制了线程池的等待线程数与执行线程数