问题 如果并行处理,为什么在一个无限的数字流中按素数进行过滤?


我正在创建一个从200万开始的无限整数流,使用朴素素数测试实现过滤此流以生成负载并将结果限制为10。

Predicate<Integer> isPrime = new Predicate<Integer>() {
    @Override
    public boolean test(Integer n) {
        for (int i = 2; i < n; i++) {
            if (n % i == 0) return false;   
        }
        return true;
    }
};

Stream.iterate(200_000_000, n -> ++n)
    .filter(isPrime)
    .limit(10)
    .forEach(i -> System.out.print(i + " "));

这按预期工作。

现在,如果我在过滤之前添加对parallel()的调用,则不会生成任何内容并且处理无法完成。

Stream.iterate(200_000_000, n -> ++n)
    .parallel()
    .filter(isPrime)
    .limit(10)
    .forEach(i -> System.out.print(i + " "));

有人能指出我在这里发生的事情的正确方向吗?

编辑:我不是在寻找更好的素性测试实现(它旨在成为一个长期运行的实现),而是为了解释使用并行流的负面影响。


3474
2018-05-02 10:57


起源

从一些快速的研究来看,似乎 parallel 应该 parallelStream。 - Carcigenicate
不,parallelStream()不是Stream上的方法,而是从集合中获取并行流的方法。我认为Collection.parallelStream()等同于Collection.stream()。parallel。 - wwerner


答案:


处理实际上已完成,但可能需要很长时间,具体取决于计算机上的硬件线程数。 API文档 关于限制警告说并行流可能会很慢。

实际上,并行流首先根据可用的并行级别将计算分成几个部分,对每个部分执行计算,然后将结果连接在一起。你的任务有多少部分?每个常见的FJP线程一个(=Runtime.getRuntime().availableProcessors())加上(有时?)一个当前线程,如果它不在FJP中。你可以控制它的添加

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");

实际上,对于您的任务,您设置的数字越小,计算的速度就越快。

如何拆分无限任务?您的特定任务由IteratorSpliterator处理 trySplit 方法从1024开始创建不断增加的大小的块。您可以自己尝试:

Spliterator<Integer> spliterator = Stream.iterate(200_000_000, n -> ++n).spliterator();
Spliterator[] spliterators = new Spliterator[10];
for(int i=0; i<spliterators.length; i++) {
    spliterators[i] = spliterator.trySplit();
}
for(int i=0; i<spliterators.length; i++) {
    System.out.print((i+1)+": ");
    spliterators[i].tryAdvance(System.out::println);
}       

所以第一个块处理范围为200000000-200001023的数字,第二个处理范围为200001024-200003071的数字,依此类推。如果您只有1个硬件线程,则您的任务将被拆分为两个块,因此将检查3072。如果您有8个硬件线程,则您的任务将被拆分为9个块,并将检查46080个数字。只有在处理完所有块之后,并行计算才会停止。将任务分成如此大块的启发式方法在你的情况下效果不好,但你会看到性能提升,该区域的素数在几千个数字中出现一次。

可能您的特定场景可能在内部进行优化(即,如果第一个线程发现已经达到限制条件,则停止计算)。随意向Java bug跟踪器报告错误。


更新 在Stream API中挖掘更多内容之后我得出结论,当前行为是一个bug, 提出了一个问题 并发布了一个 补丁。这个补丁可能会被JDK9接受,甚至可能会向后移植到JDK 8u分支。使用我的补丁,并行版本仍然无法提高性能,但至少其工作时间与顺序流工作时间相当。


11
2018-05-02 15:52





之所以 parallel 花费这么长时间的流是由于所有并行流使用的事实 common fork-join thread pool因为你提交了一个长期运行的任务(因为你的实现 isPrime 如果方法效率不高,则会阻塞池中的所有线程,因此阻止使用并行流的所有其他任务。

为了使并行版本更快,您可以实现 isPrime 更有效率。对于例如

   Predicate<Integer> isPrime = new Predicate<Integer>() {
        @Override
        public boolean test(Integer n) {
            if(n < 2) return false;
            if(n == 2 || n == 3) return true;
            if(n%2 == 0 || n%3 == 0) return false;
            long sqrtN = (long)Math.sqrt(n)+1;
            for(long i = 6L; i <= sqrtN; i += 6) {
                if(n%(i-1) == 0 || n%(i+1) == 0) return false;
            }
            return true;
        }
    };

并且您会立即注意到性能的提高。通常,当存在阻塞池中的线程的可能性时,请避免使用并行流


2
2018-05-02 12:21



一个更有效的实施方案是将Eratosthenes作为无限流的筛子(看这里,例如, blog.informatech.cr/2013/03/11/java-infinite-streams)。我试图了解使用并行流的影响,而不是素性测试。编辑问题以使其更清晰。 - wwerner
不错的价格检查方法 - Braj