Java多线程之同步与阻塞队列

479 查看

多线程对共享数据的读写涉及到同步问题,锁和条件是线程同步的强大工具。锁用来保护代码片段(临界区),任何时刻只能有一个线程执行被保护的代码。条件对象用来管理那些已经进入被保护的代码段但还不能运行的线程。

竞争条件

各线程访问数据的次序不同,可能会产生不同的结果。下面的程序可以实现两个账户之间的转账,正常情况下所有账户的总金额应该是不变的。

public void transfer(int from, int to, double amount) {
    if (accounts[from] < amount) {
        return;
    }
    accounts[from] -= amount;
    accounts[to] += amount;
    System.out.printf(" Total Balance %10.2f\n", getTotalBalance());
}

但是在上面程序的运行中发现输出的总金额是变化的,这是因为transfer()方法执行的过程中会被中断,可能存在几个线程同时读写账户余额。问题的根源在于转账这一系列动作不是原子操作,并且没有使用同步。当然同步使用不当也会造成死锁(所有线程都阻塞的状态)。

锁对象

可以使用锁和条件对象实现同步数据存取。锁能够保护临界区,确保只有一个线程执行。

注意,在finally子句中不要忘记解锁操作。若因异常抛出释放,对象可能受损。

互斥锁

ReentrantLock类能够有效防止代码块受并发访问的干扰。

private Lock bankLock;
private Condition sufficientFunds;
public void transfer(int from, int to, double amount) throws InterruptedException {
    bankLock.lock();
    try {
        while (accounts[from] < amount) {
            sufficientFunds.await();
        }
        accounts[from] -= amount;
        accounts[to] += amount;
        System.out.printf(" Total Balance %10.2f\n", getTotalBalance());
        sufficientFunds.signalAll();
    } finally {
        bankLock.unlock();
    }
}

每一个Bank对象有自己的ReentrantLock对象,如果两个线程试图访问同一个Bank对象,那么锁以串行方式提供服务。但是如果两个线程访问的是不同的Bank对象,两个线程都不会发生阻塞。

对于所有账户总金额的获取方法也需要加锁才能保证正确执行。锁是可重入的,也就是说同一个线程可以重复的获得已经持有的锁。锁保持一个持有计数来跟踪嵌套获得锁的次数,当持有计数变为0时,线程释放锁。

public double getTotalBalance() {
    bankLock.lock();
    try {
        double sum = 0;
        for (double a : accounts) {
            sum += a;
        }
        return sum;
    } finally{
        bankLock.unlock();
    }
}

测试锁

tryLock()方法用于尝试获取锁而没有发生阻塞。如果未获得锁,线程可以立即离开,去做别的事。

if(myLock.tryLock()) {
    try {
        do something
    } finally {
        myLock.unlock();
    }
} else {
    do something else
}

调用带有超时参数的tryLock(),线程可以在等待获取锁的过程中被中断,抛出InterruptedException异常。从而允许程序打破死锁,类似于lockInterruptibly()

读写锁

java.util.concurrent.locks包定义了两个锁类:ReentrantLock类和ReentrantReadWriteLock类。在读多写少(很多线程从一个数据结构读取数据,很少线程修改其中数据)的情形中,ReentrantReadWriteLock类是十分实用的。

读锁,允许多个读,排斥所有写;写锁,排斥所有读和写。

private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private Lock readLock = rwl.readLock();
private Lock writeLock = rwl.writeLock();

条件对象

条件对象用来管理那些已经获得锁但不能工作的线程。比如当账户中没有足够余额时,需等待别的线程的存款操作。

一个锁对象可以有一个或多个相关的条件对象。当一个线程调用await()等待方法时,它将进入该条件的等待集。当一个线程转账完成时会调用sufficientFunds.signalAll()方法,重新激活因为sufficientFunds这一条件而等待的所有线程,使这些线程从等待集中移出,状态变为可运行。当一个线程处于等待集中时,只能靠其他线程来重新激活自己。

synchronized关键字

使用synchronized关键字声明的方法,对象的锁将保护整个方法,其实就是隐式的使用了一个内部对象锁。内部对象锁只有一个条件对象,使用wait()/notifyAll()/notify()操作。

public synchronized void myMethod() {
    while (! (ok to proceed)) {
        wait();
    }
    do something
    notifyAll();
}

注意,signal()notify()都是随机选择一个线程,解除其阻塞状态,可能会造成死锁。

对于sychronized修饰的方法,显式使用锁对象和条件对象,形式如下。

public void myMethod() {
    this.intrinsic.lock();
    try {
        while(! (ok to proceed)) {
            condition.await();
        }
        do something
        condition.signalAll();
    } finally {
        this.intrinsic.unlock();
    }
}

为了保证操作的原子性,可以安全地使用AtomicInteger作为共享计数器而无需同步,这个类提供方法incrementAndGet()decrementAndGet()完成自增自减操作。

Volatile域

使用volatile关键字同步读写的必要性:

  • 由于寄存器或缓存的存在同一内存地址可能会取到不同的值;

  • 编译器优化中假定内存中的值仅在代码中有显式修改指令时会改变。

volatile关键字为实例域的同步访问提供了一种免锁机制,当被声明为volatile域时,编译器和虚拟机就知道该域可能被另一个线程并发更新。使用锁或volatile修饰符,多个线程可以安全地读取一个域,但volatile不提供原子性。。另外,将域声明为final,也可以保证安全的访问这个共享域。

线程局部变量

在线程间共享变量时有风险的,可以使用ThreadLocal辅助类为各个线程提供各自的实例。比如,SimpleDateFormat类不是线程安全的,内部数据结构会被下面形式的并发访问破坏。

public static final SimpleDateFormat dataFormat = new SimpleDateFormat("yyyy-MM-dd");
String dateStamp = dateFormat.format(new Date());

如果不使用synchronized或锁等开销较大的同步,可以使用线程局部变量ThreadLocal解决变量并发访问的问题。

public static final ThreadLocal<SimpleDateFormat> dateFormat =
    new ThreadLocal<SimpleDateFormat>() {
        protected SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyy-MM-dd");
        }
    };
String dateStamp = dateFormat.get().format(new Date());

在一个线程中首次调用get()时,会调用initialValue()方法,此后会返回属于当前线程的实例。

对于java.util.Random类,虽是线程安全的,但多线程共享随机数生成器却是低效的。可以使用上面提到的ThreadLocal为各个线程提供一个单独的生成器,还可以使用ThreadLocalRandom这个便利类。

int random = ThreadLocalRandom.current().nextInt(upperBound);

阻塞队列

上面关于同步的实现方式是Java并发程序设计基础的底层构建块,在实际的编程使用中,使用较高层次的类库会相对安全方便。对于典型的生产者和消费者问题,可以使用阻塞队列解决,这样就不用考虑锁和条件的问题了。

生产者线程向队列插入元素,消费者线程从队列取出元素。当添加时队列已满或取出时队列为空,阻塞队列导致线程阻塞。将阻塞队列用于线程管理工具时,主要用到put()take()方法。对于offer()poll()peek()方法不能完成时,只是给出一个错误提示而不会抛出异常。

java.util.concurrent包提供了几种形式的阻塞队列:

  • LinkedBlockingQueue:无容量限制,链表实现;

  • LinkedBlockingDeque:双向队列,链表实现;

  • ArrayBlockingQueue:需指定容量,可指定公平性,循环数组实现;

  • PriorityBlockingQueue:无边界优先队列,用堆实现。

这里有一个用阻塞队列控制一组线程的示例,实现的功能是搜索指定目录及子目录中的所有文件并找出含有查询关键字的行。里面有个小技巧,一个线程搜索完毕时向阻塞队列填充DUMMY,让所有线程能停下来。