fork-join
框架允许在几个工作进程中断某个任务,然后等待结果组合它们。 它在很大程度上利用了多处理器机器的生产能力。 以下是fork-join
框架中使用的核心概念和对象。
Fork
Fork是一个进程,其中任务将其分成可以并发执行的较小且独立的子任务。
语法
Sum left = new Sum(array, low, mid)
left.fork()
这里Sum
是RecursiveTask
的子类,left.fork()
方法将任务分解为子任务。
Join
连接(Join
)是子任务完成执行后任务加入子任务的所有结果的过程,否则它会持续等待。
语法
left.join()
这里剩下的是Sum
类的一个对象。
ForkJoinPool
它是一个特殊的线程池,旨在使用fork-and-join
任务拆分。
语法
ForkJoinPool forkJoinPool = new ForkJoinPool(4)
这里有一个新的ForkJoinPool
,并行级别为4
个CPU。
RecursiveAction
RecursiveAction
表示不返回任何值的任务。
语法
class Writer extends RecursiveAction {
@Override
protected void compute() { }
}
递归任务
RecursiveTask
表示返回值的任务。
语法
class Sum extends RecursiveTask {
@Override
protected Long compute() { return null }
}
实例
以下TestThread
程序显示了基于线程的环境中Fork-Join
框架的使用。
import java.util.concurrent.ExecutionException
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.RecursiveTask
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException, ExecutionException {
int nThreads = Runtime.getRuntime().availableProcessors()
System.out.println(nThreads)
int[] numbers = new int[1000]
for(int i=0 i< numbers.length i++){
numbers[i] = i
}
ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads)
Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length))
System.out.println(result)
}
static class Sum extends RecursiveTask<Long> {
int low
int high
int[] array
Sum(int[] array, int low, int high) {
this.array = array
this.low = low
this.high = high
}
protected Long compute() {
if(high - low <= 10) {
long sum = 0
for(int i=low i < high ++i)
sum += array[i]
return sum
} else {
int mid = low + (high - low) / 2
Sum left = new Sum(array, low, mid)
Sum right = new Sum(array, mid, high)
left.fork()
long rightResult = right.compute()
long leftResult = left.join()
return leftResult + rightResult
}
}
}
}
这将产生以下结果 -
4
499500