线程操纵术之更优雅的并行策略 - 阿里技术
阿里妹导读
本文详细介绍了并行编程以及一些并行问题案例中的真实业务场景。如何写出更优雅的并行程序?有哪些风险和注意事项?本文来为你解答。
Photo by Tomas Sobek on Unsplash
引子
0.1 从一道小学奥数题说起……
星期天小明和妈妈两个人要做好多家务。他们的任务总共分为两部分,分别是卫生间和厨房的工作。
- 在卫生间的工作包括:擦镜子要7分钟,手洗衣服要30分钟,晾衣服要5分钟,刷厕所要10分钟。
- 在厨房的工作包括:洗菜要5分钟,做饭要20分钟,倒垃圾要5分钟。
其中晾衣服必须在手洗衣服完成后才能做,做饭必须在洗菜完成后才能做。并且同一项工作最多只能一个人操作。
问题:小明和妈妈干完所有这些事情最少用多长时间?
0.2 问题解析
首先我们画出两部分的工作示意图:
答案比较显而易见,我们只要努力让两个人做家务的时间尽量相等,那么总体时间就是最少。简单排列组合后,不难得出答案为42min(任务的最小排列组合方式不止1种)
0.3 进一步思考
得出答案非常容易,在这个问题上,对我们在并行编程上有什么启示呢?
首先看一组错误的任务调度策略:
这一组策略第二个人先把小任务几乎做完,然后才开始大任务,导致第一个人完成任务后,长达12分钟没有其他事情做了。而在有任务时长的先验知识后,我们采取类似贪婪的思想,两个人在任一空闲时刻,永远做占用时间最大的任务,这样就把更多小任务留到了最后,总体可以节省更多时间。
带着这个思考,开始尝试书写更优雅的并行编程策略……
一、并行编程
并行编程是一种利用多个处理器或计算资源同时执行多个任务的编程方式,以提高计算效率和性能。它涉及任务划分、同步与互斥、通信与通信开销等概念和技术。并行编程的正确实现需要解决数据竞争、负载平衡、同步与互斥等挑战,并借助调试和性能分析工具来提高开发效率和程序性能。
1.1 并发与并行
并发是指多个任务交替执行的能力,而并行是指多个任务同时执行的能力。并发可以在单个处理器上通过快速切换任务的上下文来实现,而并行需要多个处理器或计算资源的支持。
并发:2 个队列,1 台自动售货机 | 并行:2 个队列,2 台自动售货机 |
---|
并行编程无疑是一个广阔的领域,旨在通过同时执行多个任务或操作来提高程序的性能和效率。与传统的串行编程相比,它允许多个任务在同一时间段内并行执行,从而加快计算速度和提高系统的吞吐量。
并行编程广泛应用于需要处理大量数据或执行复杂计算的领域,如科学计算、数据分析、图像处理和机器学习等。它可以利用多核处理器、分布式系统或GPU等硬件资源,通过将问题划分为多个子任务,使用多个执行单元同时处理任务,从而实现性能的提升。
1.2 Java与并行编程
有许多编程语言提供了良好的支持和工具来完成并行编程,而Java 是开发并行应用程序的理想语言之一。它的原生线程库java.lang.Thread 允许任务并行、异步运行,可以总体上提高应用程序的执行速度,并允许在更短的时间内完成更复杂的任务。此外,Java 还提供了广泛的库和框架,可用于快速高效地开发应用程序,Java自身的语言特性也使开发人员能够创建安全可靠的跨平台应用程序。
1.2.1 Java线程与任务
Java线程与任务是Java编程语言中用于并发编程的重要组件。它们允许开发人员在程序中同时执行多个任务,提高程序的并发性和性能。
1.
线程(Thread):
线程是操作系统能够进行运算调度的最小单位。在Java中,线程是通过Thread类来表示的。每个线程都有自己的执行路径,并且可以独立执行,有自己的程序计数器、栈和本地存储等。通过创建线程对象并调用其start()方法,可以启动一个新的线程。Java中的线程有两种方式创建:继承Thread类和实现Runnable接口。
2.
任务(Task):
任务是需要在线程中执行的具体工作单元。在Java中,任务通常是通过实现Runnable接口或者Callable接口来定义的。Runnable接口定义了一个run()方法,该方法中包含了任务的具体逻辑。Callable接口类似于Runnable接口,但它可以返回执行结果,并且可以抛出异常。任务通常由线程池来管理和调度。
3.
线程池(ThreadPool):
线程池是一种管理和复用线程的机制,它可以提高线程的利用率。Java中的线程池是通过ThreadPoolExecutor类来实现的。线程池维护一个线程的集合,可以根据需要创建新的线程,也可以复用空闲的线程。通过将任务提交给线程池,线程池会自动调度线程来执行任务。
1.2.2 使用 Java 库进行并行编程
1.
并行流(Parallel Stream):
Java 8引入了并行流的概念,它是一种针对集合数据的高级抽象,可以在多个线程上并行地执行流操作。通过将集合转换为并行流,可以自动将任务分割为多个子任务,并在多个处理器上并行处理。并行流使用Fork/Join框架来实现任务的划分和合并。
2.
任务执行器(Executor):
是一个接口,位于java.util.concurrent包下,它的作用主要是为我们提供任务与执行机制(包括线程使用和调度细节)之间的解耦。执行器对于管理线程和确保线程安全非常有用。
3.Fork/Join框架:它是Java提供的一个用于并行编程的框架,它基于“分而治之”(divide and conquer)的思想,用于实现任务的划分和合并。Fork/Join框架在Java 7中被引入,通过提供一种简单且高效的方式来利用多核处理器和多线程执行任务。
1.3 “分而治之”的哲学
在使用 Java 进行并行编程中,本文重点介绍Fork/Join框架,该框架也是作为Java 并行流 API的实现。
1.3.1 Fork-Join 框架
在实际应用场景中,我们可能有多个任务,也可能有一个非常艰巨的任务,而这个大任务可以拆分为较小的子任务,并且,这些子任务可能会被拆分为更小的任务。在这些情况下,Fork/Join 池会派上用场。概括来说,Fork/Join 框架通过使用分而治之和工作窃取机制来实现更高程度的并行性。
fork/join 池以递归方式划分较大的任务,直到可以按顺序计算子任务而不会进一步分解。在任务完成执行后,它们返回结果,这些结果被连接回来,然后返回最终结果。下图是并行计算0~100000的fork-join示意图。
在使用fork-join框架中,需要注意:
1.Fork/Join 本身不能分割任务,也不能自行合并结果。需要我们指定一种划分任务和合并结果的方法。
2.Fork/Join 框架无法决定一项任务是否可以进一步划分。我们必须指定一种识别它的方法,例如设置一个特定的阈值,当达到该阈值时,将按顺序计算任务。
1.3.2 使用 Fork/Join 框架实现并行算法
1.3.2.1 创建ForkJoinTask任务
在实践中,使用ForkJoin框架,必须首先创建一个ForkJoinTask任务。它提供在任务中执行fork()和join()操作的机制。通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
- RecursiveAction用于没有返回结果的任务
- RecursiveTask用于有返回结果的任务。
创建任务唯一需要做的是重写compute()方法并实现逻辑。该方法规定了程序Fork、Computation 和 Join 操作的具体位置。
1.3.2.2 提交到ForkJoinPool
ForkJoinPool是ExecutorService的实现,需要将创建的ForkJoinTask任务,使用invoke()方法提交到ForkJoinPool中,需要注意的是,子任务不需要提交到Pool中,RecursiveTask本身是一个自分裂任务。换言之,只需要提交作为根任务的大任务,框架就会完成剩下的工作。
1.3.2.3 示例完整代码
package learning.multithreading;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ParallelSumComputationUsingForkJoin {
private static final int[] LARGE_ARR = largeArr();
private static final int LENGTH = LARGE_ARR.length;
public static void main(String[] args) throws ExecutionException, InterruptedException {
RecursiveSumTask recursiveTask = new RecursiveSumTask(0, LENGTH, LARGE_ARR);
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
long start = System.currentTimeMillis();
long sum = forkJoinPool.invoke(recursiveTask);
System.out.println("The sum is : "
+ sum
+ ", Time Taken by Parallel(Fork/Join) Execution: "
+ (System.currentTimeMillis() - start) + " millis");
}
private static int[] largeArr() {
return new Random().ints(500000000, 10, 1000).toArray();
}
static class RecursiveSumTask extends RecursiveTask<Long> {
private static final int SEQUENTIAL_COMPUTE_THRESHOLD = 4000;
private final int startIndex;
private final int endIndex;
private final int[] data;
RecursiveSumTask(int startIndex, int endIndex, int[] data) {
this.startIndex = startIndex;
this.endIndex = endIndex;
this.data = data;
}
@Override
protected Long compute() {
if (SEQUENTIAL_COMPUTE_THRESHOLD >= (endIndex - startIndex)) {
long sum = 0;
for (int i = startIndex; i < endIndex; i++) {
sum += data[i];
}
return sum;
}
int mid = startIndex + (endIndex - startIndex) / 2;
RecursiveSumTask leftSumTask = new RecursiveSumTask(startIndex, mid, data);
RecursiveSumTask rightSumTask = new RecursiveSumTask(mid, endIndex, data);
leftSumTask.fork(); // Fork the Left Task in a Separate Execution
long rightSum = rightSumTask.compute(); // Compute the Right Part
long leftSum = leftSumTask.join(); // Wait for the results from the Left Part
return leftSum + rightSum; // Return Both
}
}
}
/**
* Output:
* The sum is : 252235235953, Time Taken by Parallel(Fork/Join) Execution: 139 millis
*/
在示例代码中,创建ForkJoinTask任务流程如下:
1.首先,设定一个阈值。如果要计算总和的数组的大小小于阈值,则我们按顺序计算总和,而无需进一步细分。这充当递归的基本条件。
2.如果不满足基本条件,则将任务分成两部分来处理。如何将任务拆分取决于业务场景。在本文的例子中,将任务分成两个相等的子任务,有的业务场景子任务的比例也可能不相等。示例代码中创建了两个子任务:leftSumTask和rightSumTask,分别处理数组的左半部分和右半部分。
3.之后让leftSumTask进行fork()操作,这意味着它被添加到工作窃取队列中,在ForkJoinPool队列中的任何线程都可以拾取并执行它。关于工作窃取机制在后文进行详细阐述。
4.在fork之后,将在当前线程上下文中计算rightSumTask。当然,当前线程作为右半部分计算的一部分,将进一步将任务再次分为两部分,并再次fork左部分,一直持续到满足基本条件为止。
5.接着,使用join()来获取执行左侧部分的结果。join操作是阻塞调用,调用线程会等待任务完成。
6.最后,将左和和右和相结合并返回结果。
1.3.3 工作窃取
接下来,深入探讨一下fork和join方法,以及ForkJoinPool背后的工作窃取机制。
-
ForkJoinTask可以使用execute()、invoke()和submit()来提交任务。
-
ForkJoinPool维护一个全局共享队列,所有工作线程都可以访问。所有提交的任务都会在这个Shared Queue中排队。
-
Pool中有
工作线程
,每个工作线程都有自己的队列,称为
工作窃取队列
(该特定工作线程的
本地
队列)工作线程从共享队列中获取任务,并将它们存储在本地工作窃取队列中。(目标:通过保持所有工作线程始终尽可能繁忙来最大化 CPU 利用率)
-
fork-join 池中的每个工作线程都运行一个循环来检查要执行的任务。这些工作线程为了让自己始终尽可能忙碌,会检查来自多个输入源的任务:
-
全局共享队列:
工作线程最初获取任务的地方。
-
本地工作窃取队列:
线程从全局共享队列获取主任务后,工作线程在拆分子任务时,会将这些子任务推送到自己的本地工作窃取队列中。
-
其他线程的工作窃取队列:
这是实际的工作窃取机制出现的地方。线程从其他线程的工作窃取队列中窃取任务。
-
工作线程以
LIFO
顺序访问自己的工作窃取队列,并以
FIFO
顺序访问
其他线程
的工作窃取队列。
-
工作线程在拆分任务并派生子任务时,将派生的子任务添加到队列的前面。而且
取的时候是从队列前面取
。
-
如果某个特定线程是空闲的(它自己的队列是空的),那么就可以从其他线程队列的
末尾窃取
。
-
如何决定从哪个其他线程队列中选择任务?随机。总体广义上,窃取符合
FIFO
。
-
为什么这样设计?
-
减少工作线程之间的争用。为了在工作窃取队列上提供轻量级锁定机制。
-
提供更好的Locality of Reference——缓存性能。
-
位于队列末尾的任务较旧(最先添加),代表较大的工作单元,位于队列前面的任务较年轻(最近添加),代表较小的工作单元。如果一个线程在自己的队列上运行,它首先完成较小的任务,然后再完成较大的任务。如果一个线程从其他线程的队列中窃取工作,则需要更大的任务,以便通过进一步拆分来单独解决它。【回顾一下文章开头的引子?】
1.3.4 Fork/Join Framework 和 ExecutorService 之间的区别
Fork/Join Framework | ExecutorService |
---|---|
Fork/Join 框架是分而治之算法的实现,中央 ForkJoinPool 执行分支 ForkJoinTasks。 | ExecutorService 是一个 Executor,它提供用于管理异步任务的进度跟踪和终止的方法。 |
Fork/Join 框架利用了工作窃取算法。 在 Fork/Join 框架中,当一个任务等待它使用联接操作创建的子任务完成时,执行该任务的工作线程会查找另一个尚未执行的任务,并窃取它们以开始执行。 | 与 Fork/Join 框架不同,当任务等待它使用联接操作创建的子任务完成时,执行该等待任务的工作线程不会查找其他任务。 |
fork-join 非常适合递归问题,其中任务涉及运行子任务,然后处理其结果。 | 如果使用 ExecutorService 解决此类递归问题,则最终会导致线程被捆绑,等待其他线程向它们传递结果。 |
Fork Join 是 ExecuterService 的实现。主要区别在于,此实现创建了一个 DEQUE 工作线程池。 | Executor 服务创建请求数量的线程,并应用阻塞队列来存储所有剩余的等待任务。 |
关于二者的区别,感兴趣的读者可以访问Geeksforgeeks[12]的文章查看二者的对比程序示例,会有更好的体感。
1.4 并行流
1.4.1 在流中的并行与Fork-Join
从 Java 8 开始,流的方面也使并行性成为惯用语,在深入了解过Fork-Join框架后,并行流的运转方式也就不再神秘了。用一个简单的示例程序演示并行流的执行过程:
public long sumUsingParallel() {
return LongStream.rangeClosed(1L, 10L)
.parallel()
.peek(this::printThreadName)
.reduce(0L, this::printSum);
}
public void printThreadName(long l) {
String tName = currentThread().getName();
System.out.println(tName + " offers:" + l);
}
public long printSum(long i, long j) {
long sum = i + j;
String tName = currentThread().getName();
System.out.printf(
"%s has: %d; plus: %d; result: %dn",
tName, i, j, sum
);
return sum;
}
调用会触发数字流上的fork-join机制,将流拆分为在四个线程中运行。每个线程都有一个流,每个线程以并行方式运行。
1.4.2 并行流的最佳实践
1.
确定规模:
并行流适用于对大型数据集进行密集计算的场景。如果数据集较小或计算量较小,则串行流可能更合适。并行流的创建和维护涉及到线程调度和数据划分的开销,如果任务过于简单,可能会导致性能下降。
2.
合适的数据结构:
在使用并行流时,应该选择适合并行处理的数据结构。使用并发集合(如ConcurrentHashMap)可以在并行流中实现更好的性能。
3.
数据共享和竞态条件:
并行流中的操作是并发执行的,因此需要特别注意数据共享和竞态条件的问题。确保操作是线程安全的,可以使用同步块或并发集合来保护共享数据。
4.
避免有状态的操作:
在并行流中,应尽量避免有状态的操作,即操作的结果依赖于前面的操作或全局状态。这种操作可能导致竞争和不确定的结果。应使用无状态的操作,如map和filter。
5.
调整并行度:
默认情况下,并行流使用的线程数是根据可用的处理器核心数来决定的。有时可能需要手动调整并行度以获得更好的性能。
二、并行问题案例 – 真实业务场景
本节介绍一个并行问题的案例,该案例来自真实的业务场景并隐去业务信息,通过针对业务场景优化并行策略,带来了显著的效率提升。
2.1 问题模型
问题可以被抽象为树模型,遍历树结构,找到某些特定的节点(不会超过10个),在这些节点中将会首先执行一次长时间的调用,然后再执行一系列中短时间的调用(可并行调用,但均依赖长时间调用的结果)。短调用的数量等于长调用返回的结果数量,而中调用的数量和节点本身有关,但需要长调用的结果作为参数。当长调用返回是空时,则无需执行中短调用。最后返回这些节点的所有相关调用信息。下图就是该问题模型的可视化展示。
2.2 优化过程
2.2.1 初始流并行写法
拿到问题,很快就写了一个树遍历+流并行的计算方式:
private void traverseRecursive(TreePath path, List<Node> queryList, Result result){
if (checkCurrentPathInQuery(path, queryList)) {
LongResult longRes = longInvoke(path);
result.addLongData(longRes);
List<String> parallelOp = Arrays.asList("mid", "short");
List<String> midQueryNameList = path.getMidQueryNameList();
parallelOp.parallelStream().forEach(op -> {
if ("mid".equals(op)) {
midQueryNameList.parallelStream().forEach(midQueryName -> {
MidResult midRes = midInvoke(midQueryName, longRes);
result.addMidData(midQueryName, midRes);
}
}
if ("short".equals(op)) {
longRes.getIdList().parallelStream().forEach(longId -> {
ShortResult shortRes = shortInvoke(longId);
result.addShortData(longId, midRes);
}
}
}
}
path.getChildren().entrySet().parallelStream().forEach(entry -> {
TreePath childPath = entry.getValue();
traverseRecursive(childPath, queryList, result);
}
}
理论上没什么问题,但是体感上并没有并行的速度。于是在Trace中查看调用情况。
本文所有Trace截图均来源于SLS全栈可观测应用。它是日志服务提供的一站式IT系统可观测方案,包含IT系统监控、全链路Trace、智能告警等功能。Trace记录系统中的请求路径和执行过程,帮助了解请求的处理情况和性能指标,在本文查看并行调用情况也非常好用。
SLS全栈可观测文档传送门。SLS Trace文档传送门[13]。
在这份Trace详情中,可以发现有问题:
- 长调用并没有比较理想地并行,只有长调用2和3是并行的,而理想情况是长调用1234并行调用。
2.2.2 优化为自定义ForkJoinTask
首先解决第一个问题,理论上都是可以并行的,为什么实际会无法并行呢?尝试在源码中找到答案。
2.2.2.1 parallelStream源码浅析
/**
* Returns a possibly parallel {@code Stream} with this collection as its
* source. It is allowable for this method to return a sequential stream.
*
* <p>This method should be overridden when the {@link #spliterator()}
* method cannot return a spliterator that is {@code IMMUTABLE},
* {@code CONCURRENT}, or <em>late-binding</em>. (See {@link #spliterator()}
* for details.)
*
* @implSpec
* The default implementation creates a parallel {@code Stream} from the
* collection's {@code Spliterator}.
*
* @return a possibly parallel {@code Stream} over the elements in this
* collection
* @since 1.8
*/
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
源码注释中有个迷惑的说法:Returns a possibly parallel这是怎么回事呢?
以下情况下是可能无法并行执行的原因:
1.
数据量较小:
数据量较小的这个判断在java.util.stream.AbstractPipeline类的sourceStageSpliterator方法中进行。源码中会根据数据量的大小,判断是否小于一个阈值(默认为ForkJoinPool的并行度)。如果数据量小于阈值,则不会并行化处理,而是直接返回一个串行的Stream。
2.
硬件资源限制:
如果可用的处理线程数有限,可能无法实现真正的并行执行。处理操作的复杂度和硬件资源限制的判断涉及到ForkJoinPool的实现策略。ForkJoinPool是JDK提供的用于支持并行计算的线程池,它会根据可用的处理线程数、硬件资源等因素来决定是否真正并行执行任务。具体的判断逻辑在java.util.concurrent.ForkJoinPool类中。
可以通过调用parallelStream()方法后链式调用isParallel()方法来检查流是否并行化处理。事实上,当实际调用isParallel()后返回的是开启并行的结果。(Trace中长调用2、3实际上已经是并行的)。因此可以排除流本身无法并行的情况。
那么我们把问题聚焦到并行执行的线程池。
parallelStream()方法默认使用的是ForkJoinPool.commonPool()作为并行执行的线程池。commonPool()是一个共享的ForkJoinPool实例,它是在JDK中按需创建的,用于支持并行计算任务。commonPool()的线程数默认是根据可用的CPU核心数来确定的,可以通过Java系统属性java.util.concurrent.ForkJoinPool.common.parallelism来调整。一般情况下,它会创建一个与CPU核心数相当的线程数,以充分利用可用的硬件资源。
由于commonPool()是一个共享的线程池,其他部分的代码也可能使用这个线程池来执行任务,因此在使用parallelStream()时需要注意线程池资源的竞争和并发性。在这个场景下,需要精细控制并行执行的线程池,考虑使用ForkJoinPool类自行创建和管理。
2.2.2.2 ForkJoinPool改造
public class Solution {
ForkJoinPool pool;
public Solution() {
pool = new ForkJoinPool();
}
public execute(TreePath path, List<Node> queryList, Result result) {
TraverseRecursiveTask firstLayerTask = new TraverseRecursiveTask(path, queryList, result);
pool.invoke(firstLayerTask);
// pool.shutdown();
}
private class TraverseRecursiveTask extends RecursiveAction {
private TreePath path;
private List<Node> queryList;
public Result result;
public TraverseRecursiveTask(TreePath path, List<Node> queryList, Result result) {
this.path = path;
this.queryList = queryList;
this.result = result;
}
@Override
protected void compute() {
if (checkCurrentPathInQuery(path, queryList)) {
LongResult longRes = longInvoke(path);
result.addLongData(longRes);
List<String> parallelOp = Arrays.asList("mid", "short");
List<String> midQueryNameList = path.getMidQueryNameList();
parallelOp.parallelStream().forEach(op -> {
if ("mid".equals(op)) {
midQueryNameList.parallelStream().forEach(midQueryName -> {
MidResult midRes = midInvoke(midQueryName, longRes);
result.addMidData(midQueryName, midRes);
}
}
if ("short".equals(op)) {
longRes.getIdList().parallelStream().forEach(longId -> {
ShortResult shortRes = shortInvoke(longId);
result.addShortData(longId, midRes);
}
}
}
}
ArrayList<TraverseRecursiveTask> subTasks = new ArrayList<>();
for (Map.Entry<Link, TreePath> entry : path.getChildren().entrySet()) {
TraverseRecursiveTask subTask = new TraverseRecursiveTask(path, queryList, result);
subTasks.add(subTask);
}
for (TraverseRecursiveTask subTask : subTasks) {
subTask.fork();
}
for (TraverseRecursiveTask subTask : subTasks) {
subTask.join();
}
}
}
}
运行,并查看调用Trace情况:
有效果了!长调用2、3、4都正常并行执行了。但是仍然有问题:
-
长短调用的顺序有问题。看上去每一个长调用执行完就执行它的中短调用了。应该首先执行所有长调用然后再执行各个长调用下属的中短调用。
-
图中可见,中短调用前期占满了16个CPU,而并没有把大任务前置。就像先擦玻璃再最后做饭,会整体造成效率低下【回顾一下文章开头的引子?】
2.2.3 前置长调用的fork
public class Solution {
ForkJoinPool pool;
public Solution() {
pool = new ForkJoinPool();
}
public execute(TreePath path, List<Node> queryList, Result result) {
TraverseRecursiveTask firstLayerTask = new TraverseRecursiveTask(path, queryList, result);
pool.invoke(firstLayerTask);
// pool.shutdown();
}
private class TraverseRecursiveTask extends RecursiveAction {
private TreePath path;
private List<Node> queryList;
public Result result;
public TraverseRecursiveTask(TreePath path, List<Node> queryList, Result result) {
this.path = path;
this.queryList = queryList;
this.result = result;
}
@Override
protected void compute() {
ArrayList<TraverseRecursiveTask> subTasks = new ArrayList<>();
for (Map.Entry<Link, TreePath> entry : path.getChildren().entrySet()) {
TraverseRecursiveTask subTask = new TraverseRecursiveTask(path, queryList, result);
subTasks.add(subTask);
}
for (TraverseRecursiveTask subTask : subTasks) {
subTask.fork();
}
if (checkCurrentPathInQuery(path, queryList)) {
LongResult longRes = longInvoke(path);
result.addLongData(longRes);
List<String> parallelOp = Arrays.asList("mid", "short");
List<String> midQueryNameList = path.getMidQueryNameList();
parallelOp.parallelStream().forEach(op -> {
if ("mid".equals(op)) {
midQueryNameList.parallelStream().forEach(midQueryName -> {
MidResult midRes = midInvoke(midQueryName, longRes);
result.addMidData(midQueryName, midRes);
}
}
if ("short".equals(op)) {
longRes.getIdList().parallelStream().forEach(longId -> {
ShortResult shortRes = shortInvoke(longId);
result.addShortData(longId, midRes);
}
}
}
}
for (TraverseRecursiveTask subTask : subTasks) {
subTask.join();
}
}
}
}
实际操作非常简单:就是先遍历树,再判断path是否是需要的树节点。这样在遇到2.1节的问题模型示意图的情况时,会将遍历树的子任务最先fork,也就是永远先fork大调用,而不是在父节点判断为需要调用的节点后,直接开始小调用的fork。
Trace详情也说明了其有效性,长调用永远最先并行执行,且当第一个长调用执行完毕后(恰好有中短调用),即开始以CPU最大数执行长调用的中短调用,总体上达到了最优调度策略。
未优化 4.85s→ ForkJoin改造 3.3s→ 前置长调用fork 1.65s
三、总结
3.1 如何写出更优雅的并行程序
1.吃透业务逻辑
,洞察优化点。程序不能脱离业务而谈,必须清楚业务特点和规律之后,针对性地进行优化。本文中,大前提是每个调用节点中,先长调用,然后中短调用,且有先后依赖关系的特点。
2.贪婪策略调度工作线程。
在本文中,有两个地方体现了这个思想,其一是JDK源码中工作窃取机制,从队列后端获取较大任务。其二是代码优化部分,前置长调用的fork。主旨都是让确定性的大任务前置的思路,以获得更高的工作线程并行度。
3.有所为,有所不为。
并行程序不是银弹,更好地发挥出并行的力量需要遵循其最佳实践,即确定规模、采用合适的数据结构、数据共享和竞态条件、避免有状态的操作、分场景调整并行度。最好还要对所处机器的配置、负载做调查。
3.2 一些风险和注意事项
3.2.1 parallelStream与deadlock
并非每一个独立调用parallelStream的代码都会独立维护运行一个多线程的策略,而是JDK默认会调用同一个由运行环境维护的ForkJoinPool线程池,也就是说,无论在哪个地方写了list.parallelStream().forEach()这样一段代码,底层实际都会由一套ForkJoinPool的线程池进行运行,一般线程池运行会遇到的冲突、排队等问题,这里同样会遇到,且会被隐藏在代码逻辑中。一旦发生deadlock,所有调用parallelStream的地方都会被阻塞,无论你是否知道其他人是否这样书写了代码。
常见deadlock:
- 线程池内部阻塞
list.parallelStream().forEach(o -> {
o.doSomething(); // 此步骤导致线程被wait,锁,循环锁,外部操作卡住
});
* static代码块中执行迭代
```php
class A {
static {
list.parallelStream().forEach(n -> {
n.doSomething();
})
}
// 执行lambda表达式的前提是当前类A必须完成类初始化,但初始化又由于static代码块无法执行,而导致程序互锁,最终导致卡住
}
- 迭代时对象被修改。
执行list.parallelStream().forEach()过程中时,如果不慎修改了list对象的长度,则也有可能导致join操作无法完成。
为了避免这些问题,可以考虑创建一个独立的ForkJoinPool线程池,而不是使用默认的公共线程池。这样可以更好地控制并行操作的执行环境,并避免与其他任务共享线程池带来的问题。
ForkJoinPool forkJoinPool = new ForkJoinPool(20);
ForkJoinTask<?> fs = forkJoinPool.submit(() -> list.parallelStream().forEach((n) -> {
n.doSomething();
}));
3.3 工欲善其事,必先利其器
本文所有Trace截图均来源于日志服务SLS全栈可观测应用。服务接入Trace后,无需任何特殊设置,即可查看本文截图涉及的并行调用情况,非常方便。不单单是并行情况,Trace记录系统中整体的请求路径和执行过程,帮助了解请求的处理情况和性能指标,能帮助您观测复杂调用情况,能力强大。
相关导航:【SLS Trace Demo[14]】【SLS Trace文档[15]】【SLS控制台[16]】
SLS全栈可观测应用除了全链路Trace,还包含IT系统监控、性能监控、用户体验监控、智能告警等功能,它是日志服务提供的一站式IT系统可观测方案。现在就开始探索全栈可观测Demo吧!
相关导航:【SLS全栈可观测Demo[17]】【SLS全栈可观测文档[18]】
参考
1.Overview Of Fork-Join Framework — Core of Parallelism in Java:https://medium.com/@cs.vivekgupta/overview-of-fork-join-framework-core-of-parallelism-in-java-35f4a4cc8c3b
2.Java 8 Streams: Definitive Guide to Parallel Streaming with parallel() :https://stackabuse.com/java-8-streams-guide-to-parallel-streaming-with-parallel/
3.Parallel Programming Java: Java Explained :https://bito.ai/resources/parallel-programming-java-java-explained/#3
4.parallelStream和ForkJoinPool的使用风险 :https://www.jianshu.com/p/27d4e33d9f6d
5.Stream流与parallelStream原理分析 :https://juejin.cn/post/7274149231366995987
6.深入理解 ForkJoinPool:入门、使用、原理 :https://juejin.cn/post/7150836399234236430
7.Introduction to the Fork/Join Framework :https://www.pluralsight.com/guides/introduction-to-the-fork-join-framework
8.Difference Between Fork/Join Framework and ExecutorService in Java :https://www.geeksforgeeks.org/difference-between-fork-join-framework-and-executorservice-in-java/
9.Wikipedia: Work Stealing :https://en.wikipedia.org/wiki/Work_stealing
10.Lecture 10: Parallel Streams :https://nus-cs2030.github.io/1718-s2/lec10/index.html#learning-objectives
11.Parallel Programming Basics with the Fork/Join Framework in Java
12.https://www.geeksforgeeks.org/difference-between-fork-join-framework-and-executorservice-in-java/
13.https://help.aliyun.com/zh/sls/user-guide/trace-app/
14.https://sls.aliyun.com/doc/playground/demo.html?dest=/lognext/app/observability/trace/sls-mall/sls-mall?resource=%2Ftrace%2Fsls-mall%2Fexplorer
15.https://help.aliyun.com/zh/sls/user-guide/trace-app/
16.https://sls.console.aliyun.com/lognext/profile
17.https://sls.aliyun.com/doc/playground/demo.html?dest=/lognext/app/observability/home
18.https://help.aliyun.com/zh/sls/user-guide/usage-notes-39
欢迎加入【阿里云开发者公众号】读者群
这是一个专门面向“阿里云开发者”公众号的读者交流空间
💡 在这里你可以探讨技术和实践,我们也会定期发布群福利和活动~
欢迎添加微信:argentinaliu 入群