0 引言
前段时间需要把一个C++的项目port到Java中,因此时隔三年后重新熟悉了下Java。由于需要一个通用的线程池,自然而然就想到了Executors。
用了后,感觉很爽... 于是忍不住抠了下源码。因此就有了这篇学习笔记。
言归正传,Java Executor是一个功能丰富,接口设计很好的,基于生产者-消费者模式的通用线程池。这种线程池的设计思想也在很多地方被应用。
在这篇文章中,我并不打算介绍java线程池的使用,生产者-消费者模式,并发编程基本概念等。
通常来说,一个线程池的实现包括四个部分:
- 执行任务的线程
- 用于封装任务的task对象
- 存储任务的数据结构
- 线程池本身
1 Thread
Thread 并不是concurrent包的一部分。Thread包含着name, priority等成员和对应的操作方法。
它是继承自runable的,也就是说线程的入口函数是run。它的继承体系和重要操作函数如下图:
它实现了一系列包括sleep, yield等静态方法。以及获取当前线程的静态方法currentThread()。这些都是native方法。
值得注意的是它的中断机制(虽然它也实现了suspend和resume方法,但是这两个方法已被弃用):
- 通过调用interrupt来触发一个中断
- isInterrupted() 用来查询线程的中断状态
- interrupted() 用来查询并清除线程的中断状态
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
在默认的情况下,blocker (Interruptible 成员变量)的值为null, 这时调用interrupt,仅仅是调用interrupt0设置一个标志位。
而如果blocker的值不为null,则会调用其interrupt方法实现真正的中断。
(关于blocker值何时被设置,在后面会看到一个使用场景。)
当线程处于可中断的阻塞状态时,比如说阻塞在sleep, wait, join,select等操作时,调用interrupt方法会让线程从阻塞状态退出,并抛出InterruptedException。
值得注意的一点是:interrupt让我们从阻塞的方法中退出,但线程的中断状态却并不会被设置!
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
System.out.println("IsInterrupted: " + Thread.currentThread().isInterrupted());
}
如上述示例代码,此时你得到的输出是: IsInterrupted : false 。这是一个有点令人意外的地方。
上述代码并不是一个好的示例,因为interrupt被我们“吃”掉了!除非你明确的知道这是你想要的。否则的话请考虑在异常捕获中(catch段中)加上:
Thread.currentThread.interrupt();
2. Task
Java可执行的接口类有两种,Runnable和Callable,它们的区别是Callable可以带返回值,一个需要实现Run()方法,另一个需要实现带返回值的Call() 方法。
在java.util.concurret中还有另外一个接口类Future。
Future表示一个异步任务的结果,就是user code向线程池提交一个任务后,它会返回对应的 Future对象。用以观察任务执行的状态(isCancelled, isDone),取消任务(Cancel)或者等待任务执行(get, timeout get)。
如上图,RunnableFuture是一个中间类,它将Runnable和Future的功能糅合到一起。FutureTask 则是真正的实现。
FutureTask
FutureTask可以从一个Runnable和Callable构造,当通过Runnable构造时,它会调用Excutors.callable接口将其转为Callable对象保存起来。
从上面的类图中可以看出,FutureTask除了简单的状态查询等接口外,还具有两个重要的接口:get()
和 get(long timeout, TimeUnit unit)), cancel(bool mayInterruptIfRunning)
。
它们分别提供两个重要的功能:阻塞(当前线程)等待(一段时间)直到task完成或者异常终止;取消任务。
任务取消
一个任务具有三种状态:尚未运行,正在运行,已经执行完毕。
在调用cancel后,如果任务处于已经执行完毕了,则不需要做任何事情直接返回;
如果任务尚未运行,将其状态设为cancelled;
如果任务正在执行,而且user以cancel(true)的方式取消这个任务。那么FutureTask会通过调用Thread.interrupt来终止当前任务。
public boolean cancel(boolean mayInterruptIfRunning) {
// 任务已经完成或者被中断等其他状态
if (state != NEW)
return false;
if (mayInterruptIfRunning) {
// 正在运行,或者尚未运行
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
if (t != null)
t.interrupt();
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
// 设置cancel标志位
else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
finishCompletion();
return true;
}
注意到: FutureTask并没有一个RUNNING的状态来标识该任务正在执行。正常的情况下,任务从开始创建直到运行完毕,这段过程的状态都是NEW。
阻塞等待
user code可以调用get() 接口等待任务完成或者调用get(long, TimeUnit)等待一段时间。但get()接口被调用,当前的线程将被挂起,直到条件满足(任务完成或者异常退出)。
在前文中我们了解到,Thread并没有提供挂起和阻塞的方法。在这里,Java利用LockSupport类来实现目的。(我猜测其中用了类似条件变量的方法来实现)。
park
LockSupport也属于concurrent。FutureTask利用它的park (parkNanos)和unpark方法来实现线程的挂起和恢复:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
public static void unpark(Thread thread) {
if (thread != null)
unsafe.unpark(thread);
}
其中parkNanos跟park方法并无本质区别,只是多了一个timeout参数。FutureTask分别用它们来实现get和timeout的get。
注意到上面的setBlocker方法了吗?没错,它就是给在上文Thread.interrupt方法中出现过的Thread成员变量blocker赋值。从这我们可以看出,它是可中断的。
而它真正实现挂起的则是依赖unsafe类。unsafe类在concurrent中频繁出现,但sun去并不建议使用它。
它除了提供park,unpark方法外,还提供了一些内存和同步原语。比如CAS等。
多个等待者
调用get()的线程可以是一个,也可以是多个。为了能够在恰当的时机将它们一一恢复,FutureTask内部需要维护一个链表来记录所有的等待线程:waiters.
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
get 全貌
至此,我们终于了解get的全貌了。get会调用awaitDone方法来实现阻塞。当然,只有两个状态需要处理:NEW, COMPLETING。
NEW的状态在前文已经有介绍过。COMPLETING状态通常持续较短,在FutureTask 内部的callable 的call方法调用完毕后,会需要将call的返回值设置到outcome这个成员变量。随后将状态设为NORMAL。这期间的状态就是COMPLETING。
显而易见,对于这种状态,我们只需要调用yield让出线程资源,使得FutureTask完成这一过程即可。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { // 1
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet 2
Thread.yield();
else if (q == null) // 3
q = new WaitNode();
else if (!queued) // 4
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { // 5
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else // 6
LockSupport.park(this);
}
}
当任务处于NEW状态正在被执行时,其他线程调用get而进入awaitdone函数。
此时的流程是 3 -> 4 -> 5 或者 3 -> 4 -> 6。
它会首先分配一个WaitNode对象 --> 把它插入到waiters链表的表头 --> 然后开始等待。那么park函数何时返回呢?
- 对应的unpark被调用(或者在这之前已经被调用)
- 如果设置了timeout的,会在时间到达后退出。
- 被中断。
- 其他异常。
等待线程恢复
当任务执行完毕(或者被cancel)时,FutureTask会调用最终调用finishcompletion,改函数会改变FutureTask状态,并调用LockSupport.unpark方法。
此时,awaitDone线程从park中返回,然后检查当前的状态已经被改变,随后退出for循环。
线程安全
FutureTask是会被多个线程访问的,涉及到临界区的保护,但是其内部却并没有任何的锁操作。而在该类定义的末尾,有这样的代码。
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
这段代码会在类被加载时执行一次。注意到它利用getDeclaredField反射机制来保存了三个offset:
stateOffset,runnerOffset,waitersOffset分别对应着state,runner,waiters这三个成员的偏移量。
FutureTask真是对这三个成员变量进行CAS操作来保证原子性和无锁化的。实现CAS的类正是上文出现过的sun.misc.Unsafe类。
UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())
第一个参数是对象指针,第二个是偏移量,第三个是旧值,最后一个是新值。详细可参考Unsafe文档。
3. BlockingQueue
java实现了生产者-消费者模式的队列。由于队列的容量有限,因此涉及到在队列为空的时候取task和在队列已满的时候存task的策略,连同一系列的查询函数一起,BlockingQueue包含着11个静态方法。
BlockingQueue只是一个interface,它的实现类包括链表方式的LinkedBlockingQueue 、数组方式的ArrayBlockingQueue以及PriorityBlockingQueue等。
LinkedBlockingQueue
下面以LinkedBlockingQueue为例来了解一下它的实现。
LinkedBlockingQueue是一个FIFO的队列,它真正用来存储元素的节点类型是Node :
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
对应的,在LinkedBlockingQueue中保存了头节点和尾节点 :
/**
* Head of linked list.
* Invariant: head.item == null
*/
private transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
在LinkedBlockingQueue中,Java使用了双锁机制,分别对头节点和尾节点加锁。这样取和存的操作就可以同时进行了。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
以take为例,获取并移除此队列的头部,在元素变得可用之前一直等待(可被打断)。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
它将会一直阻塞在notEmpty.await()上,直到信号到达或者被中断。注意到它只需要对takeLock加锁,而无需对putLock加锁。
相应的,put操作也只需要锁上putLock就可以了。
有的操作则需要两个锁都锁上,比如说remove,因为我们不确定要删除的元素的位置。
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
可以看到LinkedBlockingQueue 并没有直接调用lock,而是通过fullyLock和fullyUnLock来加解锁以保证一致性,避免死锁:
/**
* Lock to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlock to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
当然,双锁队列在插入第一个元素和最后一个元素出队的时候会有冲突。这里的解决办法是加了一个哨兵,开始的时候,头尾节点都指向这个哨兵,在随后的操作中,头结点始终指向哨兵,而尾节点指向真正有效的值。
4. Executors
类结构
有了前面这些零件,我们就可以开始组装线程池对象了。java里面Executors的真正实现类主要包括两个ThreadPollExecutors和ScheduledThreadPoolExecutor。其中ScheduledThreadPoolExecutor通过实现其基类ScheduledExecutorService扩展了ThreadPoolExecutor类。
SheduledExecutorsService主要用于执行周期性的或者定时的任务。其他情况下我们更多使用ThreadPoolExecutor。
ThreadPoolExecutor
ThreadPoolExecutor总共有七个构造参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
从其注释和参数名不难猜测各个参数的用途。唯一有点麻烦的是corePoolSize, maximumPoolSize这两个参数的区别。你可以参考这里或者这里。
但大多数情况我们并不需要直接调用它的构造函数,在Executors里面定义了一系列的静态方法供我们使用。包括newFixedThreadPool、newSingleThreadExecutor等。
由于ThreadPoolExecutor是一个通用的线程池,因此它需要为各种各样的情况预留足够的接口。ThreadPoolExecutor除了提供丰富的接口外,还提供了一些“什么都不做”的函数,为user预留接口。
比如每个任务在执行之前会调用beforeExecute,执行完毕后又会调用afterExecute。又比如terminate用来通知用户代码该线程将要结束。
这些接口java都提供了及其丰富的文档。
Executor接口设计的目的或许也在于此,为简单的情况提供尽量简单的使用方法,同时为复杂的情况或者说高级用户提供足够多的接口。
一个不用担心的问题
在最初使用ThreadPoolExecutor 时候,用到FutrueTask的cancel接口,我总是担心一个问题:
由于cancel是依赖线程的interrupt方法来实现的,也就是说cancel的状态保持在线程中而不是task中。那么当这个线程执行下一个task会不会被影响?为了验证这一点,我做了个小小的实验:
public class InterruptTest
{
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread());
System.out.println("before interrupt " + Thread.currentThread().isInterrupted());
Thread.currentThread().interrupt();
System.out.println("after interrupt " + Thread.currentThread().isInterrupted());
}
}
public static void main(String[] str)
{
ExecutorService service = Executors.newFixedThreadPool(1);
// MyTask task1 = new MyTask();
Future<?> future1 = service.submit(new InterruptTest.MyTask());
Future<?> future2 = service.submit(new InterruptTest.MyTask());
}
}
输出结果说明,我的担心是多余的:
Thread[pool-1-thread-1,5,main]
before interrupt false
after interrupt true
Thread[pool-1-thread-1,5,main]
before interrupt false
after interrupt true
其关键代码就在ThreadPoolExecutor.runWorker 方法中,线程的中断状态会被清除(shutDown例外)。
final void runWorker(Worker w) {
...
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
...
}
其中Executors还有很多的东西,但是看看文章的长度,我决定把那些关于Executors的笔记先“藏”起来。
如果感兴趣的可以翻看源码: ThreadFactory, RejectHandler, worker, task, shutDown策略,锁机制... 看看ThreadPoolExecutor 把这些积木堆成一个房子的吧。