我正在创建一个从200万开始的无限整数流,使用朴素素数测试实现过滤此流以生成负载并将结果限制为10。
Predicate<Integer> isPrime = new Predicate<Integer>() {
@Override
public boolean test(Integer n) {
for (int i = 2; i < n; i++) {
if (n % i == 0) return false;
}
return true;
}
};
Stream.iterate(200_000_000, n -> ++n)
.filter(isPrime)
.limit(10)
.forEach(i -> System.out.print(i + " "));
这按预期工作。
现在,如果我在过滤之前添加对parallel()的调用,则不会生成任何内容并且处理无法完成。
Stream.iterate(200_000_000, n -> ++n)
.parallel()
.filter(isPrime)
.limit(10)
.forEach(i -> System.out.print(i + " "));
有人能指出我在这里发生的事情的正确方向吗?
编辑:我不是在寻找更好的素性测试实现(它旨在成为一个长期运行的实现),而是为了解释使用并行流的负面影响。
处理实际上已完成,但可能需要很长时间,具体取决于计算机上的硬件线程数。 API文档 关于限制警告说并行流可能会很慢。
实际上,并行流首先根据可用的并行级别将计算分成几个部分,对每个部分执行计算,然后将结果连接在一起。你的任务有多少部分?每个常见的FJP线程一个(=Runtime.getRuntime().availableProcessors()
)加上(有时?)一个当前线程,如果它不在FJP中。你可以控制它的添加
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
实际上,对于您的任务,您设置的数字越小,计算的速度就越快。
如何拆分无限任务?您的特定任务由IteratorSpliterator处理 trySplit 方法从1024开始创建不断增加的大小的块。您可以自己尝试:
Spliterator<Integer> spliterator = Stream.iterate(200_000_000, n -> ++n).spliterator();
Spliterator[] spliterators = new Spliterator[10];
for(int i=0; i<spliterators.length; i++) {
spliterators[i] = spliterator.trySplit();
}
for(int i=0; i<spliterators.length; i++) {
System.out.print((i+1)+": ");
spliterators[i].tryAdvance(System.out::println);
}
所以第一个块处理范围为200000000-200001023的数字,第二个处理范围为200001024-200003071的数字,依此类推。如果您只有1个硬件线程,则您的任务将被拆分为两个块,因此将检查3072。如果您有8个硬件线程,则您的任务将被拆分为9个块,并将检查46080个数字。只有在处理完所有块之后,并行计算才会停止。将任务分成如此大块的启发式方法在你的情况下效果不好,但你会看到性能提升,该区域的素数在几千个数字中出现一次。
可能您的特定场景可能在内部进行优化(即,如果第一个线程发现已经达到限制条件,则停止计算)。随意向Java bug跟踪器报告错误。
更新 在Stream API中挖掘更多内容之后我得出结论,当前行为是一个bug, 提出了一个问题 并发布了一个 补丁。这个补丁可能会被JDK9接受,甚至可能会向后移植到JDK 8u分支。使用我的补丁,并行版本仍然无法提高性能,但至少其工作时间与顺序流工作时间相当。
之所以 parallel
花费这么长时间的流是由于所有并行流使用的事实 common fork-join thread pool
因为你提交了一个长期运行的任务(因为你的实现 isPrime
如果方法效率不高,则会阻塞池中的所有线程,因此阻止使用并行流的所有其他任务。
为了使并行版本更快,您可以实现 isPrime 更有效率。对于例如
Predicate<Integer> isPrime = new Predicate<Integer>() {
@Override
public boolean test(Integer n) {
if(n < 2) return false;
if(n == 2 || n == 3) return true;
if(n%2 == 0 || n%3 == 0) return false;
long sqrtN = (long)Math.sqrt(n)+1;
for(long i = 6L; i <= sqrtN; i += 6) {
if(n%(i-1) == 0 || n%(i+1) == 0) return false;
}
return true;
}
};
并且您会立即注意到性能的提高。通常,当存在阻塞池中的线程的可能性时,请避免使用并行流