多线程并发修改一个数据结构,很容易破坏这个数据结构,如散列表。锁能够保护共享数据结构,但选择线程安全的实现更好更容易,如阻塞队列就是线程安全的集合。
线程安全的集合
Vector
和HashTable
类提供了线程安全的动态数组和散列表,而ArrayList
和HashMap
却不是线程安全的。
java.util.concurrent
包提供了映射表、有序集、队列的高效实现,如:
ConcurrentLinkedQueue
:多线程安全访问,无边界,非阻塞,队列;ConcurrentHashMap
:多线程安全访问,散列映射表,初始容量默认16,调整因子默认0.75。
并发的散列映射表ConcurrentHashMap
提供原子性的关联插入putIfAbsent(key, value)
和关联删除removeIfPresent(key, value)
。写数组的拷贝CopyOnWriteArrayList
和CopyOnWriteArraySet
是线程安全的集合,所有的修改线程会对底层数组进行复制。对于经常被修改的数据列表,使用同步的ArrayList
性能胜过CopyOnWriteArrayList
。
对于线程安全的集合,返回的是弱一致性的迭代器:
迭代器不一定能反映出构造后的所有修改;
迭代器不会将同一个值返回两次;
迭代器不会抛出
ConcurrentModificationException
异常。
通常线程安全的集合能够高效的支持大量的读者和一定数量的写者,当写者线程数目大于设定值时,后来的写者线程会被暂时阻塞。而对于大多数线程安全的集合,size()
方法一般无法在常量时间完成,一般需要遍历整个集合才能确定大小。
同步包装器
任何集合类使用同步包装器都会变成线程安全的,会将集合的方法使用锁加以保护,保证线程的安全访问。使用同步包装器时要确保没有任何线程通过原始的非同步方法访问数据结构,也可以说确保不存在任何指向原始对象的引用,可以采用下面构造一个集合并立即传递给包装器的方法定义。
List<E> synchArrayList = Collections.synchronizedList(new ArrayList<E>());
Map<K, V> synchHashMap = Collections.synchronizedMap(new HashMap<K, V>());
当然最好使用java.util.concurrent
包中定义的集合,同步包装器并没有太多安全和性能上的优势。
Callable与Future
Callable
与Runnable
类似,都可以封装一个异步执行的任务,但是Callable
有返回值。Callabele<T>
接口是一个参数化的类型,只有一个方法call()
,类型参数就是返回值的类型。Future
用来保存异步计算的结果,用get()
方法获取结果。get()
方法的调用会被阻塞,直到计算完成。有超时参数的get()
方法超时时会抛出TimeoutException
异常。
FutureTask
可将Callable
转换成Future
和Runnable
,实现了两者的接口。
Callable<Integer> myComputation = new MyComputationCallable();
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task); // it's a Runnable
t.start();
Integer result = task.get(); // it's a Future
这里有一个计算指定目录及其子目录下与关键字匹配的文件数目的例子,涉及到Callable
、FutureTask
、Future
的使用。
public Integer call() {
count = 0;
try {
File [] files = directory.listFiles();
List<Future<Integer>> results = new ArrayList<>();
for (File file : files) {
if (file.isDirectory()) {
MatchCounter counter = new MatchCounter(file, keyword);
FutureTask<Integer> task = new FutureTask<>(counter);
results.add(task);
Thread t = new Thread(task);
t.start();
} else {
if (search(file)) {
count++;
}
}
}
for (Future<Integer> result : results) {
try {
count += result.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
;
}
return count;
}
线程池
构建一个新的线程是有代价的,涉及到与操作系统的交互。对于程序中需要创建大量生命期很短的线程,应该使用线程池。线程池中的线程执行完毕并不会马上死亡,而是在池中准备为下一个请求提供服务。当然使用线程池还可以限制并发线程的数目。
需要调用执行器Executors
的静态工厂方法来构建线程池,下面的方法返回的是ExecutorService
接口的ThreadPoolExecutor
类的对象。
Executors.newCachedThreadPool
:线程空闲60秒后终止,若有空闲线程立即执行任务,若无则创建新线程。Executors.newFixedThreadPool
:池中线程数由参数指定,固定大小,剩余任务放置在队列。
使用submit()
方法,将Runnable
对象或Callable
对象提交给线程池ExecutorService
,任务何时执行由线程池决定。调用submit()
方法,会返回一个Future
对象,用来查询任务状态或结果。当用完线程池时,要记得调用shutdown()
关闭,会在所有任务执行完后彻底关闭。类似的调用shutdownNow
,可取消尚未开始的任务并试图终端正在运行的线程。
线程池的使用步骤大致如下:
调用
Executors
类的静态方法newCachedThreadPool()
或newFixedThreadPool()
;调用
submit()
提交Runnable
或Callable
对象;如果提交
Callable
对象,就要保存好返回的Future
对象;线程池用完时,调用
shutdown()
。
对于之前提到的计算文件匹配数的例子,需要产生大量生命期很多的线程,可以使用一个线程池来运行任务,完整代码在这里。
public Integer call() {
count = 0;
try {
File [] files = directory.listFiles();
List<Future<Integer>> results = new ArrayList<>();
for (File file : files) {
if (file.isDirectory()) {
MatchCounter counter = new MatchCounter(file, keyword, pool);
Future<Integer> result = pool.submit(counter);
results.add(result);
} else {
if (search(file)) {
count++;
}
}
}
for (Future<Integer> result : results) {
try {
count += result.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
;
}
return count;
}
Fork-Join框架
对于多线程程序,有些应用使用了大量线程,但其中大多数都是空闲的。还有些应用需要完成计算密集型任务,Fork-Join框架专门用来支持这类任务。使用Fork-Join框架解决思路大致是分治的思想,采用递归计算再合并结果。只需继承RecursiveTask<T>
类,并覆盖compute()
方法。invokeAll()
方法接收很多任务并阻塞,直到这些任务完成,join()
方法将生成结果。
对于问题,统计数组中满足某特性的元素个数,使用Fork-Join框架是很合适的。
import java.util.concurrent.*;
public class ForkJoinTest {
public static void main(String [] args) {
final int SIZE = 10000000;
double [] numbers = new double[SIZE];
for (int i = 0; i < SIZE; i++) {
numbers[i] = Math.random();
}
Counter counter = new Counter(numbers, 0, numbers.length, new Filter() {
public boolean accept(double x) {
return x > 0.5;
}
});
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(counter);
System.out.println(counter.join());
}
}
interface Filter {
boolean accept(double t);
}
class Counter extends RecursiveTask<Integer> {
private final int THRESHOLD = 1000;
private double [] values;
private int from;
private int to;
private Filter filter;
public Counter(double [] values, int from, int to, Filter filter) {
this.values = values;
this.from = from;
this.to = to;
this.filter = filter;
}
public Integer compute() {
if (to - from < THRESHOLD) {
int count = 0;
for (int i = from; i < to; i++) {
if (filter.accept(values[i])) {
count++;
}
}
return count;
} else {
int mid = (from + to) / 2;
Counter first = new Counter(values, from, mid, filter);
Counter second = new Counter(values, mid, to, filter);
invokeAll(first, second);
return first.join() + second.join();
}
}
}
另外,Fork-Join框架使用工作密取来平衡可用线程的工作负载,比手工多线程强多了。