JUC之线程池的分析

前提:

阿里巴巴不建议使用JDK自带的线程池!

在实际开发中,我们是怎么使用的?(重点)

实际开发中,线程资源必须通过线程池提供,不允许在应用中自行显式创建线程

使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题

实际开发中,线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式

FixedThreadPool 和 SingleThreadPool,允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 CachedThreadPool 和 ScheduledThreadPool,允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

自带的线程池的使用:

前提介绍:

解读源码可知:

所有自带的线程池类都是对ThreadPoolExecutor类进行再封装

ThreadPoolExecutor类:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

7大参数:

  • corePoolSize :核心线程数
  • maximumPoolSize:最大线程数
  • keepAliveTime:除核心线程外其余线程等待释放的时间
  • unit:单位(分,秒)
  • workQueue:阻塞队列(当核心线程数被占用完,后面进入的线程会先进入阻塞队列,阻塞队列也满了之后,再进入线程,就会启动最大线程)
  • threadFactory:线程工程,一般选默认即可
  • handler:拒绝策略(最大线程数,阻塞队列全满了之后,就会执行拒绝策略)

4大拒绝策略:

  • AbortPolicy

该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。

源码如下:

 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //不做任何处理,直接抛出异常
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
  • DiscardPolicy

    这个策略和AbortPolicy的slient版本,如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。

    源码如下:

       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            	//就是一个空的方法
            }
    
  • DiscardOldestPolicy 这个策略从字面上也很好理解,丢弃最老的。也就是说如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。 因为队列是队尾进,队头出,所以队头元素是最老的,因此每次都是移除对头元素后再尝试入队。 源码如下:

            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                	//移除队头元素
                    e.getQueue().poll();
                    //再尝试入队
                    e.execute(r);
                }
            }
    
  • CallerRunsPolicy

    使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。就像是个急脾气的人,我等不到别人来做这件事就干脆自己干。

    源码如下:

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    //直接执行run方法
                    r.run();
                }
            }
    

源码分析:

1.Executors.newSingleThreadExecutor()

单例线程池,线程池里只有一个实例

测试代码:

package com.li;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

//线程池
public class ExecutorPoolText {
    //
    public static void main(String[] args) {
        ExecutorService service0 = Executors.newSingleThreadExecutor();  //单个线程的线程池
      //  ExecutorService service1 = Executors.newFixedThreadPool(5); //固定大小的线程池
        //ExecutorService service2 = Executors.newCachedThreadPool(); //可伸缩的线程池


        for (int i = 0; i < 10; i++) {
            service0.execute(()->{
                System.out.println(Thread.currentThread().getName()+"=>执行");
            });
        }




    }
    
    
}

结果:

只有一个线程在跑!

查看源码:

  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

在这可以知道,核心线程数和最大线程数就是1,所以只有一个线程在跑。

但是一个线程为什么可以把所有的线程都跑了呢,不应该执行拒绝策略吗?

**关键点:**new LinkedBlockingQueue()

通过查看其源码

   public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

阻塞队列的大小是21亿,由此可见,大多数情况下是不会执行拒绝策略的(因为哪有21亿个线程可以开啊),后面加入的线程会在阻塞队列里循环等待被执行,但这也造成了线程池不安全 OOM!

2.Executors.newFixedThreadPool()

固定线程数

测试代码:

package com.li;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

//线程池
public class ExecutorPoolText {
    //
    public static void main(String[] args) {
//        ExecutorService service0 = Executors.newSingleThreadExecutor();  //单个线程的线程池
        ExecutorService service1 = Executors.newFixedThreadPool(5); //固定大小的线程池
        //ExecutorService service2 = Executors.newCachedThreadPool(); //可伸缩的线程池


        for (int i = 0; i < 10; i++) {
            service1.execute(()->{
                System.out.println(Thread.currentThread().getName()+"=>执行");
            });
        }




    }
    
    
}

测试结果:

确实是不同的线程在执行

源码分析:

  public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

核心线程数和最大线程数都是固定的,都是输入的参数大小,且也不会执行拒绝策略!

原因和上面的一样 new LinkedBlockingQueue()

3.Executors.newCachedThreadPool()

可伸缩线程池

测试代码:

package com.li;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

//线程池
public class ExecutorPoolText {
    //
    public static void main(String[] args) {
//        ExecutorService service0 = Executors.newSingleThreadExecutor();  //单个线程的线程池
//        ExecutorService service1 = Executors.newFixedThreadPool(5); //固定大小的线程池
        ExecutorService service2 = Executors.newCachedThreadPool(); //可伸缩的线程池


        for (int i = 0; i < 10; i++) {
            service2.execute(()->{
                System.out.println(Thread.currentThread().getName()+"=>执行");
            });
        }




    }
    
    
}

结果:

线程池的大小(线程数)不是固定的,而是跟着进入的线程数变化而变化(当然不一定同步!)

源码分析:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

无核心线程数,最大的线程数为21亿!,所以其实也不会执行拒绝策略,因为最大线程数太大了!

这里和上面的不同用了一个new SynchronousQueue() 同步队列,该队列的意思就是只能入一个出一个!,所以线程过多时会开启最大线程数,容易造成OOM

总结:

为什么自带的不安全?

综上的源码分析 可知,自带的都没有拒绝策略,并且线程数或者循环队列的大小不断扩大,会容易导致OOM!

就如一开始所说的

FixedThreadPool 和 SingleThreadPool,允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 CachedThreadPool 和 ScheduledThreadPool,允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

所以要用自带的!

自定义线程池

规定线程数,并且指定拒绝策略!

package com.li;

import java.util.concurrent.*;

//线程池
public class ExecutorPoolText {
    //
    public static void main(String[] args) {
//        ExecutorService service0 = Executors.newSingleThreadExecutor();  //单个线程的线程池
//        ExecutorService service1 = Executors.newFixedThreadPool(5); //固定大小的线程池
//        ExecutorService service2 = Executors.newCachedThreadPool(); //可伸缩的线程池

       ThreadPoolExecutor  myPool = new ThreadPoolExecutor(2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 10; i++) {
            myPool.execute(()->{
                System.out.println(Thread.currentThread().getName()+"=>执行");
            });
        }




    }
    
    
}

自定义线程池,核心线程数为2,最大线程数为5,阻塞队列为3,采用默认的拒绝策略,超过了直接抛出异常!

一开始先执行两个核心线程,后来进入的在阻塞队列排队,超出队列长度再开启最大线程数,若是还是超过了(最大线程数,阻塞队列都满了),就进行拒绝策略,这里就是直接抛出异常!

结果:

结果与预设的相同!!!

这样就保证了线程池的安全性,也会避免OOM异常,正常工作中都是采用这种方式创建线程池的