AQS
-
AbstractQueuedSynchronizer 抽象类
AQS 是 java 中管理 “锁” 的抽象类,锁的许多公共方法都是在这个类中实现。
AQS 是独占锁 (例如,ReentrantLock) 和共享锁 (例如,Semaphore) 的公共父类。
1.1 独占锁
锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为 “ 公平锁 ” 和 “ 非公平锁 ”。
1.1.1 公平锁,是按照通过 CLH 等待线程按照先来先得的规则,公平的获取锁;
1.1.2 非公平锁,则当线程要获取锁时,它会无视 CLH 等待队列而直接获取锁。独占锁的典型实例子是 ReentrantLock,此外,ReentrantReadWriteLock.WriteLock 也是独占锁。1.2 共享锁
能被多个线程同时拥有,能被共享的锁。JUC 包中的 ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch 和 Semaphore 都是共享锁。这些锁的用途和原理,在以后的章节再详细介绍。
![图片上传中...]
ReentrantLock
如果采用Lock,必须主动去释放锁;但是发生异常时,不会自动释放锁。
因此一般来说,使用Lock必须在try{}catch{}中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定会被释放,防止死锁。
使用Lock 进行同步的话,形式如下:
Lock lock = ...;
lock.lock(); // 加锁
try{
// 处理任务
}catch(Exception ex){
}finally{
lock.unlock(); // 释放锁
}
多线程对共享资源进行操作的时候,使用锁是十分必要的
假设有一个生产者类mPro,一个消费者类mCus,每次操作创建一个线程。
mPro.produce(60);
mPro.produce(120);
mCus.consume(90);
mCus.consume(150);
mPro.produce(110);
如果不使用独占锁,很可能出现Thread-0 produce(60) --> size=-60
Thread-4 produce(110) --> size=50
Thread-2 consume(90) <-- size=-60
Thread-1 produce(120) --> size=-60
Thread-3 consume(150) <-- size=-60
因为首先线程1 对size +60, 没有调用println的时候,切换到线程2, 又加了120, 没打印又被切换了,然后连续两次消费了90 和 150再切换到线程1打印,这个时候size就是-60了。
Condition
Condition 接口描述了可能会与锁有关联的条件变量。Condition 需要和 Lock 联合使用,它的作用是代替 Object 监视器方法,可以通过 await(),signal() 来休眠 / 唤醒线程。
比如生产者-消费者问题中,共享资源空的时候,消费者不能再消费;共享资源满的时候,生产者不能再生产;所以Lock 需要借助Condition
private Condition fullCondtion; // 生产条件
private Condition emptyCondtion; // 消费条件
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
class Depot{
private int capacity; // 仓库的容量
private int size; // 仓库的实际数量
private Lock lock; //独占锁
private Condition fullCondition; // 生产条件
private Condition emptyCondition; // 消费条件
public Depot(int capacity) {
this.capacity = capacity;
this.size = 0;
this.lock = new ReentrantLock();
this.fullCondition = lock.newCondition();
this.emptyCondition = lock.newCondition();
}
public void produce(int val){
lock.lock();
try{
// left 表示“想要生产的数量”(有可能生产量太多,需多此生产)
int left = val;
while(left>0){
// 库存已满时,等待“消费者”消费产品。
while(size>=capacity)
{
fullCondition.await();
}
// 获取“实际生产的数量”(即库存中新增的数量)
// 如果“库存”+“想要生产的数量”>“总的容量”,则“实际增量”=“总的容量”-“当前容量”。(此时填满仓库)
// 否则“实际增量”=“想要生产的数量”
int inc = (size+left)>capacity ? (capacity-size) : left;
size += inc;
left -= inc;
System.out.println(Thread.currentThread().getName()+"want to produce" + val + "val-inc"+left + "actually produce "+ inc+ "now size"+size);
// 通知“消费者”可以消费了。
emptyCondition.signal();
}
} catch(InterruptedException e) {
}finally {
lock.unlock();
}
}
public void consume(int val){
lock.lock();
try{
// left 表示“客户要消费数量”(有可能消费量太大,库存不够,需多此消费)
int left = val;
while (left > 0) {
// 库存为0时,等待“生产者”生产产品。
while (size <= 0)
{
emptyCondition.await();
}
// 获取“实际消费的数量”(即库存中实际减少的数量)
// 如果“库存”<“客户要消费的数量”,则“实际消费量”=“库存”;
// 否则,“实际消费量”=“客户要消费的数量”。
int dec = (size<left) ? size : left;
size -= dec;
left -= dec;
System.out.println(Thread.currentThread().getName()+"want to produce" + val + "val-dnc"+left + "actually produce "+ dec+ "now size"+size);
fullCondition.signal();
}
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}
//生产者
class Producer {
private Depot depot;
public Producer(Depot depot) {
this.depot = depot;
}
// 消费产品:新建一个线程向仓库中生产产品。
public void produce(final int val) {
new Thread() {
public void run() {
depot.produce(val);
}
}.start();
}
}
// 消费者
class Customer {
private Depot depot;
public Customer(Depot depot) {
this.depot = depot;
}
// 消费产品:新建一个线程从仓库中消费产品。
public void consume(final int val) {
new Thread() {
public void run() {
depot.consume(val);
}
}.start();
}
}
public class LockTest3 {
public static void main(String[] args) {
Depot mDepot = new Depot(100);
Producer mPro = new Producer(mDepot);
Customer mCus = new Customer(mDepot);
mPro.produce(60);
mPro.produce(120);
mCus.consume(90);
mCus.consume(150);
mPro.produce(110);
}
}