线程
概念
操作系统调度的最小单元是线程,也叫轻量级进程(Light Weight Process),在一个进程里可以创建多个线程, 这些线程都拥有各自的计数器、 堆栈和局部变量等属性, 并且能够访问共享的内存变量。 处理器在这些线程上高速切换, 让使用者感觉到这些线程在同时执行。
线程的创建
- 通过继承Thread类来创建一个线程
- 实现Runnable接口并重写run()方法,new Thread(runnable).start(),线程启动时就会自动调用该对象的run方法
- 实现Callable接口并实现call()方法,使用FutureTask类包装Callable对象,使用FutureTask对象作为Thread对象的targer创建并启动线程;也可以使用线程池启动
Runnable 和 Callable 的区别 1. Runnable规定方法是run方法,Callable规定方法是call方法 2. Runnable任务执行后无返回值,Callable任务执行后可返回值 3. run方法无法抛出异常,call方法可以抛出异常 4. 运行Callable任务可以拿到一个Future对象,Future表示异步计算结果,他提供了检查计算是否完成的方法,以等待计算完成并获取结果。计算完成后用get()方法获取结果,如果线程没有执行完,get()方法会阻塞当前线程执行。如果线程出现异常,get()方法会抛出异常。
- 线程池:Executors类提供了方便的工厂方法来创建不同类型的 executor services。无论Runnable还是Callable都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行
1. public static ExecutorService newCachedThreadPool() 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程,但是在之前构造的线程可用时将重用它们。 2. public static ExecutorService newFixedThreadPool(int nThreads) 创建一个定长线程池,可控制线程最大并发数,以共享的无界队列方式来运行线程,超出的线程会在队列中等待。 3. public static ExecutorService newSingleThreadExecutor() 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,以无界队列方式来运行线程,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 4. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创建一个周期线程池,支持定时及周期性任务执行。 5. public static ExecutorService newWorkStealingPool() 创建持有足够线程的线程池来支持给定的并行级别,并通过使用多个队列,减少竞争,它需要穿一个并行级别的参数,如果不传,则被设定为默认的CPU数量,这个线程池实际上是ForkJoinPool的扩展,适合使用在很耗时的任务中,能够合理的使用CPU进行并行操作。
线程的管理
- ForkJoinPool 的每个工作线程都维护了一个工作队列(WorkQueue),这是一个双端队列,里面存放的对象是任务(ForkJoinTask)
- 每个工作线程在运行中产生新的任务(通常是因为调用了fork()),会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是LIFO方式,也就是每次从队尾取任务执行。
- 每个工作线程在处理自己的工作队列时,会尝试窃取一个任务(或是来自刚刚提交到pool的任务,或是来自其他的工作队列),窃取的任务位于其他线程工作队列的队首,也就是使用FIFO方式。
- 在遇到join()时如果join的任务尚未完成,则会先处理其他任务,并等待其完成。
- ExecutorCompletionService 内部维护了一个阻塞队列(BlockingQueue), 只有完成的任务才被加入到队列中。如果队列中的数据为空时, 调用take()就会阻塞直到有完成的任务加入队列,基于FutureTask实现。
线程池原理
ThreadPoolExecutor
1 | public ThreadPoolExecutor(int corePoolSize, |
- corePoolSize 核心线程数量,当有新任务在exectue()方法提交时,会执行以下判断:
1. 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的; 2. 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务; 3. 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理; 4. 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务; 5. 所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize
- maximumPoolSize 最大线程数量;
- keepAliveTime 线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
- TimeUnit 线程保持活动的时间单位
- BlockingQueue
workQueue 保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池以后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式: - 直接切换:这种方式常用的队列是SynchronousQueue
- 使用无界队列:一般使用基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不会起作用了。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
- 使用有界队列:一般使用ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
- threadFactory 它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
- handler 它是RejectedExecutionHandler类型的变量,表示线程池的饱和的拒绝策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
- AbortPolicy:直接抛出异常,这是默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
线程池状态
1 | private static final int COUNT_BITS = Integer.SIZE - 3; |
- COUNT_BITS: 线程的最大位数29位
- CAPACITY:线程的最大容量
- RUNNING:运行状态,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0
- SHUTDOWN:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
- STOP:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
- TIDYING:当所有的任务已终止,任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。
- TERMINATED:终止状态,当执行 terminated() 后会更新为这个状态。
核心源码
线程池执行源码
execute
1 | public void execute(Runnable command) { |
runState和workCount变量怎么存储在一个int中?参考:https://blog.csdn.net/weixin_34396902/article/details/94527424
addWorker
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
t.start()这个语句,启动时会调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。
Worker类
1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable |
Worker类投机取巧的继承了AbstractQueuedSynchronizer来简化在执行任务时的获取、释放锁,这样防止了中断在运行中的任务,只会唤醒(中断)在等待从workQueue中获取任务的线程.
不直接执行execute(command)提交的command,而要在外面包一层Worker主要是为了使用用AQS锁控制中断,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁,只有在等待从workQueue中获取任务getTask()时才能中断。
runWorker 方法
在Worker类中的run方法调用了runWorker方法来执行任务.
1 |
|
getTask 方法
runWorker函数中最重要的是getTask(),不断的从阻塞队列中取任务交给线程执行,并且负责线程回收
1 | private Runnable getTask() { |
FutureTask源码
1 |
|
线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
- prestartCoreThread():初始化一个核心线程;
- prestartAllCoreThreads():初始化所有核心线程
线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭
- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
线程池大小
- 粗略
- 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
- 如果是IO密集型任务,参考值可以设置为2*NCPU
- 精确:((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
- 最佳:压测
任务缓存队列
workQueue,它用来存放等待执行的任务。BlockingQueue 是个接口,你需要使用它的实现之一来使用BlockingQueue,java.util.concurrent包下具有以下 BlockingQueue 接口的实现类:
- ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。
- LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
- DelayQueue:DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。
- PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。
- SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。
线程池总结
- 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
- 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
- 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。