并发 - 生产者消费者的问题

544 查看

AQS

  1. AbstractQueuedSynchronizer 抽象类
    AQS 是 java 中管理 “锁” 的抽象类,锁的许多公共方法都是在这个类中实现。
    AQS 是独占锁 (例如,ReentrantLock) 和共享锁 (例如,Semaphore) 的公共父类。
    1.1 独占锁
    锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为 “ 公平锁 ” 和 “ 非公平锁 ”。
    1.1.1 公平锁,是按照通过 CLH 等待线程按照先来先得的规则,公平的获取锁;
    1.1.2 非公平锁,则当线程要获取锁时,它会无视 CLH 等待队列而直接获取锁。独占锁的典型实例子是 ReentrantLock,此外,ReentrantReadWriteLock.WriteLock 也是独占锁。

    1.2 共享锁
    能被多个线程同时拥有,能被共享的锁。JUC 包中的 ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch 和 Semaphore 都是共享锁。这些锁的用途和原理,在以后的章节再详细介绍。

![图片上传中...]

ReentrantLock

如果采用Lock,必须主动去释放锁;但是发生异常时,不会自动释放锁。
因此一般来说,使用Lock必须在try{}catch{}中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定会被释放,防止死锁。

使用Lock 进行同步的话,形式如下:

Lock lock = ...;
lock.lock(); // 加锁
try{
    // 处理任务
}catch(Exception ex){
}finally{
lock.unlock(); // 释放锁
}

多线程对共享资源进行操作的时候,使用锁是十分必要的
假设有一个生产者类mPro,一个消费者类mCus,每次操作创建一个线程。

mPro.produce(60);
mPro.produce(120);
mCus.consume(90);
mCus.consume(150);
mPro.produce(110);

如果不使用独占锁,很可能出现
Thread-0 produce(60) --> size=-60 Thread-4 produce(110) --> size=50 Thread-2 consume(90) <-- size=-60 Thread-1 produce(120) --> size=-60 Thread-3 consume(150) <-- size=-60
因为首先线程1 对size +60, 没有调用println的时候,切换到线程2, 又加了120, 没打印又被切换了,然后连续两次消费了90 和 150再切换到线程1打印,这个时候size就是-60了。

Condition

Condition 接口描述了可能会与锁有关联的条件变量。Condition 需要和 Lock 联合使用,它的作用是代替 Object 监视器方法,可以通过 await(),signal() 来休眠 / 唤醒线程。

比如生产者-消费者问题中,共享资源空的时候,消费者不能再消费;共享资源满的时候,生产者不能再生产;所以Lock 需要借助Condition

private Condition fullCondtion;            // 生产条件
private Condition emptyCondtion;           // 消费条件
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
class Depot{
    private int capacity; // 仓库的容量
    private int size; // 仓库的实际数量
    private Lock lock; //独占锁
    private Condition fullCondition; // 生产条件
    private Condition emptyCondition; // 消费条件

    public Depot(int capacity) {
        this.capacity = capacity;
        this.size = 0;
        this.lock = new ReentrantLock();
        this.fullCondition = lock.newCondition();
        this.emptyCondition = lock.newCondition();
    }
    public void produce(int val){
        lock.lock();
        try{
             // left 表示“想要生产的数量”(有可能生产量太多,需多此生产)
            int left = val; 
            while(left>0){
                // 库存已满时,等待“消费者”消费产品。
                while(size>=capacity)
                {
                    fullCondition.await();
                }
            // 获取“实际生产的数量”(即库存中新增的数量)
            // 如果“库存”+“想要生产的数量”>“总的容量”,则“实际增量”=“总的容量”-“当前容量”。(此时填满仓库)
            // 否则“实际增量”=“想要生产的数量”
            int inc = (size+left)>capacity ? (capacity-size) : left;
            size += inc;
            left -= inc;
            System.out.println(Thread.currentThread().getName()+"want to produce" + val + "val-inc"+left + "actually produce "+ inc+ "now size"+size);
            // 通知“消费者”可以消费了。
            emptyCondition.signal();
            }
        } catch(InterruptedException e) {
        }finally {
                lock.unlock();
        }
}

public void consume(int val){
    lock.lock();
    try{
        // left 表示“客户要消费数量”(有可能消费量太大,库存不够,需多此消费)
        int left = val;

        while (left > 0) {
        // 库存为0时,等待“生产者”生产产品。
            while (size <= 0)
            {
                emptyCondition.await();
            }
        // 获取“实际消费的数量”(即库存中实际减少的数量)
        // 如果“库存”<“客户要消费的数量”,则“实际消费量”=“库存”;
        // 否则,“实际消费量”=“客户要消费的数量”。
        int dec = (size<left) ? size : left;
        size -= dec;
        left -= dec;
        System.out.println(Thread.currentThread().getName()+"want to produce" + val + "val-dnc"+left + "actually produce "+ dec+ "now size"+size);
        fullCondition.signal();
        }
    } catch (InterruptedException e) {
    } finally {
        lock.unlock();
    }
}

//生产者
class Producer {
    private Depot depot;

    public Producer(Depot depot) {
        this.depot = depot;
    }

    // 消费产品:新建一个线程向仓库中生产产品。
    public void produce(final int val) {
        new Thread() {
            public void run() {
                depot.produce(val);
            }
        }.start();
    }
}

// 消费者
class Customer {
    private Depot depot;

    public Customer(Depot depot) {
        this.depot = depot;
    }

    // 消费产品:新建一个线程从仓库中消费产品。
    public void consume(final int val) {
        new Thread() {
            public void run() {
                depot.consume(val);
            }
        }.start();
    }
}

public class LockTest3 {  
    public static void main(String[] args) {  
        Depot mDepot = new Depot(100);
        Producer mPro = new Producer(mDepot);
        Customer mCus = new Customer(mDepot);

        mPro.produce(60);
        mPro.produce(120);
        mCus.consume(90);
        mCus.consume(150);
        mPro.produce(110);
    }
}