Lock
Lock接口主要操作类是ReentrantLock
,可以起到synchronized的作用,另外也提供额外的功能。
用Lock重写上一篇中的死锁例子
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Resource {
Lock lock=new ReentrantLock();
int num=0;
void doSome(){
}
public void deal(Resource res){
while(true){
boolean mylock=this.lock.tryLock();//尝试获得当前Resource的锁定
boolean resLock=res.lock.tryLock();//尝试获得传入的Resource的锁定
try{
if(mylock&&resLock){
res.doSome();
System.out.println(res+":"+this.num);
break;//退出循环
}
}finally{
if(mylock)
this.lock.unlock();
if(resLock)
res.lock.unlock();
}
}
}
}
重写后不会出现死锁的原因在于,当无法同时获得两个锁定时,干脆释放已获得的锁定。
上面代码使用当前Resource的Lock的tryLock()方法尝试获得锁定,以及传入Resource的Lock的tryLock()方法尝试获得锁定。只有当可以获得两个Resource的锁定,才能执行res.doSome().最后无论什么情况,都要finally解除锁定。
ReadWriteLock
ReadWriteLock接口定义了读取锁定和写入锁定的行为。可以使用readLock()
,writeLock()
方法返回Lock操作对象。ReentrantReadWriteLock
是ReadWriteLock接口的主要操作类.ReentrantReadWriteLock.readLock
操作Lock接口,调用其lock()方法时,若没有任何ReentrantReadWriteLock.writeLock
调用过lock()方法,也就是没有任何写入锁定时,才可以取得读取锁定。
下面用ReadWriteLock
试着写一个ArrayList
import java.util.Arrays;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MyArrayList<T> {
private ReadWriteLock lock=new ReentrantReadWriteLock();
private Object[] list;
private int next=0;
public MyArrayList(){
list=new Object[16];
}
public void add(T obj){
try{
lock.writeLock().lock();//获取写入锁定
if(next==list.length)
list=Arrays.copyOf(list, list.length*2);
list[next++]=obj;
}finally{
lock.writeLock().unlock();//解除写入锁定
}
}
@SuppressWarnings("unchecked")
public T get(int index){
try{
lock.readLock().lock();//获取读取锁定
return (T) list[index];
}finally{
lock.readLock().unlock();//解除读取锁定
}
}
public int size(){
try{
lock.readLock().lock();
return next;
}finally{
lock.readLock().unlock();
}
}
}
重写后的效果是
若有线程调用add()方法进行写入操作,先获得写入锁定。这时如果有其他线程准备获得写入锁定或读取锁定,都必须等待前面的写入锁定解除。
若有线程调用get()方法进行读取操作,先获得读取锁定。这时如果有其他线程准备获得读取锁定,则可以获得;但如果是准备获得写入锁定,仍然要等待所有读取锁定解除。
使用ReadWriteLock
的好处在于,如果有两个线程都想调用get()和size()方法,由于锁定的关系,其中一个线程只能等到另一个线程解除锁定。然而,这两个方法都只是读取对象状态,如果只是读取操作,就可以允许线程并行,这样读取效率将会提高。
Condition
Condition接口用来搭配Lock,最基本的用法就是达到Object的wait(),notify(),notifyAll()方法的作用。
下面用wait(),notify(),notifyAll()实现生产者与消费者.
店员从生产者获得生产出的商品,消费者从店员取走商品
若生产者生产速度较快,店员那可能有很多商品,店员会叫生产者停一下。过一段时间,店员那商品不多了,再通知生产者继续生产
若消费者取走速度过快,店员那可能没有商品可供取走,店员会叫消费者停一下。过一段时间,店员那有商品了,再通知消费者过来取
这里假定店员那最多只能放一件商品
public class Producer implements Runnable{
private Clerk clerk;
public Producer(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
for(int i=0;i<10;i++){
try {
Thread.sleep((int)Math.random()*3000);
} catch (InterruptedException e) {
}
clerk.setProduct(i);
}
}
}
public class Consumer implements Runnable{
private Clerk clerk;
public Consumer(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
for(int i=0;i<10;i++){
try {
Thread.sleep((int)Math.random()*3000);
} catch (InterruptedException e) {
}
clerk.getProduct();
}
}
}
public class Clerk extends Thread{
private int product=-1;//没有商品
public synchronized void setProduct(int product){
while(this.product!=-1){
try {
wait();//店员那有商品,生产者停一下
} catch (InterruptedException e) {
}
}
this.product=product;
System.out.println("生产者生产商品"+this.product);
notify();//通知等待集合(唤醒的可能是消费者,也可能是生产者)
}
public synchronized int getProduct(){
while(this.product==-1){
try {
wait();//店员没有商品,消费者停一下
} catch (InterruptedException e) {
}
}
int p=this.product;
System.out.println("消费者消费商品"+this.product);
this.product=-1;//商品已经被取走
notify();
return p;
}
public static void main(String[] args){
Clerk clerk=new Clerk();
new Thread(new Producer(clerk)).start();
new Thread(new Consumer(clerk)).start();
}
}
生产者生产商品0
消费者消费商品0
生产者生产商品1
消费者消费商品1
生产者生产商品2
消费者消费商品2
生产者生产商品3
消费者消费商品3
生产者生产商品4
消费者消费商品4
生产者生产商品5
消费者消费商品5
生产者生产商品6
消费者消费商品6
生产者生产商品7
消费者消费商品7
生产者生产商品8
消费者消费商品8
生产者生产商品9
消费者消费商品9
现在用Condition接口重写
public class Clerk {
private int product=-1;//没有商品
Lock lock=new ReentrantLock();
private Condition condition=lock.newCondition();
public void setProduct(int product){
try{
lock.lock();
while(this.product!=-1){
try {
condition.await();//店员那有商品,生产者停一下
} catch (InterruptedException e) {
}
}
this.product=product;
System.out.println("生产者生产商品"+this.product);
condition.signal();//通知等待集合(唤醒的可能是消费者,也可能是生产者)
}finally{
lock.unlock();
}
}
public int getProduct(){
try{
lock.lock();
while(this.product==-1){
try {
condition.await();//店员没有商品,消费者停一下
} catch (InterruptedException e) {
}
}
int p=this.product;
System.out.println("消费者消费商品"+this.product);
this.product=-1;//商品已经被取走
condition.signal();
return p;
}finally{
lock.unlock();
}
}
public static void main(String[] args){
Clerk clerk=new Clerk();
new Thread(new Producer(clerk)).start();
new Thread(new Consumer(clerk)).start();
}
}
注意在多个生产者,消费者线程的情况下,等待集合中两者都会有,而condition.signal()从等待集合中唤醒的具体对象是不确定的。有可能消费者取走商品后,唤醒的还是消费者,这时,消费者又会执行while循环,进入等待集合。
事实上,一个Condition对象可以表示一个等待集合。这样上面例子,可以有两个等待集合,一个给消费者用,一个给生产者用。生产者只会通知消费者的等待集合,消费者也只会通知生产者的等待集合。这样效率会高些。
public class Clerk {
...
private Condition producerCondition=lock.newCondition();//生产者的等待集合
private Condition consumerCondition=lock.newCondition();//消费者的等待集合
public void setProduct(int product){
try{
lock.lock();
while(this.product!=-1){
try {
producerCondition.await();//店员那有商品,生产者停一下
} catch (InterruptedException e) {
}
}
this.product=product;
System.out.println("生产者生产商品"+this.product);
consumerCondition.signal();//唤醒消费者等待集合
}finally{
lock.unlock();
}
}
public int getProduct(){
try{
lock.lock();
while(this.product==-1){
try {
consumerCondition.await();//店员没有商品,消费者停一下
} catch (InterruptedException e) {
}
}
int p=this.product;
System.out.println("消费者消费商品"+this.product);
this.product=-1;//商品已经被取走
producerCondition.signal();//唤醒生产者等待集合
return p;
}finally{
lock.unlock();
}
}
...
}
Executor
定义Executor接口的目的是将Runnable的指定与如何执行分离。它只定义了一个execute()方法。
public class Page{
private Executor executor;
public Page(Executor executor){
this.executor=executor;
}
...
public void method1(){
...
executor.execute(new Runnable(){
@Override
public void run(){
...
}
});
...
}
}
public class DirectExecutor implements Executor{
public void execute(Runnable r){
r.run();
}
}
调用
new Page(new DirectExecutor()).method1();
Executor api
ThreadPoolExecutor
像线程池这类服务,实际上是定义在Executor接口的子接口ExecutorService中。通用的ExecutorService由抽象类AbstractExecutorService操作,如果需要线程池功能,可以使用其子类ThreadPoolExecutor.
重写上面executor例子
ExecutorService executorService=Executors.newCachedThreadPool();
new Page(executorService).method1();
executorService.shutdown();//在指定执行的Runnable都完成后,将ExecutorService关闭
Future与Callable
ExecutorService还定义了submit(),invokeAll(),invokeAny()等方法,这些方法出现在java.util.concurrent.Future
,java.util.concurrent.Callable
接口
Future定义的行为就是让你在将来取得结果。你可以将想执行的工作交给Future,Future会使用另一个线程处理,你可以先做别的事情。过些时候,再调用Future的get()获得结果。
如果结果已经产生,get()会直接返回,否则会进入阻塞状态直到结果返回。get()的另一种重载方法可以指定等待结果的时间,若指定时间内结果还没产生,则抛出TimeoutException异常。也可以使用Future的isDone()方法看结果是否产生。
Future经常与Callable一起使用,Callable的作用与Runnable相似,都是用来定义执行的流程。
Runnable的run()方法无返回值,也无法抛出异常
Callable的call()方法可以有返回值,也可以抛出异常
FutureTask
是Future的操作类,创建时可传入Callable对象指定执行的流程
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskDemo {
public static int fib(int n){
return n<=1?n:fib(n-1)+fib(n-2);
}
public static void main(String[] args){
FutureTask<Integer> task=new FutureTask<Integer>(
new Callable<Integer>(){
@Override
public Integer call() throws Exception {
return fib(30);
}
}
);
new Thread(task).start();
try {
Thread.sleep(3000);
System.out.println(task.get());
} catch (InterruptedException|ExecutionException e) {
}
}
}
FutureTask构造类
FutureTask实现RunnableFuture接口
,RunnableFuture接口
继承Runnable,Future接口。所以可以new Thread(task).
ExecutorService的submit()方法也可以接受Callable对象,调用后返回Future对象。
ExecutorService service=Executors.newCachedThreadPool();
Future<Integer> future=service.submit(new Callable<Integer>(){
@Override
public Integer call() throws Exception {
return fib(30);
}
});
如果有多个Callable,可以先将它们收集到Collection中,然后调用ExecutorService的invokeAll()方法,返回List<Future>
如果有多个Callable,要求其中只要有一个执行完成就行了,则可以先将它们收集到Collection中,然后调用ExecutorService的invokeAny()方法
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor用来进行工作排程,其中的schedule()方法用来排定Runnable或Callable实例延迟多久执行一次,并返回Future子接口ScheduledFuture的实例。
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecution {
public static void main(String[] args){
ScheduledExecutorService service=Executors.newSingleThreadScheduledExecutor();
service.scheduleWithFixedDelay(new Runnable(){
public void run(){
System.out.println(new Date());
try {
Thread.sleep(2000);//假设工作会执行2s
} catch (InterruptedException e) {
}
}
}, 2000, 1000, TimeUnit.MILLISECONDS);
}
}
Sat Oct 24 17:11:59 CST 2015
Sat Oct 24 17:12:02 CST 2015
Sat Oct 24 17:12:05 CST 2015
Sat Oct 24 17:12:08 CST 2015
Sat Oct 24 17:12:11 CST 2015
可以看到,输出两两间相差3s.scheduleWithFixedDelay()
方法参数
如果把方法换成scheduleAtFixedRate()
Sat Oct 24 17:28:28 CST 2015
Sat Oct 24 17:28:30 CST 2015
Sat Oct 24 17:28:32 CST 2015
Sat Oct 24 17:28:34 CST 2015
每次排定的执行周期是1s,但是工作执行的时间是2s,会超过排定的执行周期,所以输出两两间相差2s。
ForkJoinPool
Future的另一个操作类ForkJoinTask
,与ExecutorService的另一个操作类ForkJoinPool
有关,它们都是jdk7新增的api,用来解决分而治之的问题。
ForkJoinTask
操作Future接口,可以在未来获得耗时工作的执行结果ForkJoinPool
管理ForkJoinTask
,调用fork()方法,可以让另一个线程执行ForkJoinTask
如果要获得
ForkJoinTask
的执行结果,可以调用join()方法。如果执行结果还没产生,会阻塞直至有执行结果返回
使用ForkJoinTask
的子类RecursiveTask
,它是个抽象类,使用时必须继承它,并操作compute()方法。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class FibDemo extends RecursiveTask<Integer>{
final int n;
FibDemo(int n){
this.n=n;
}
public static int fib(int n){
return n<=1?n:fib(n-1)+fib(n-2);
}
@Override
protected Integer compute() {
if(n<=10){
return fib(n);
}
FibDemo f1=new FibDemo(n-1);
f1.fork();//ForkJoinPool分配线程执行子任务
FibDemo f2=new FibDemo(n-2);
return f2.compute()+f1.join();//执行f2子任务+获得f1子任务进行完成的结果
}
public static void main(String[] args){
FibDemo fib=new FibDemo(40);
ForkJoinPool pool=new ForkJoinPool();
System.out.println(pool.invoke(fib));
}
}