问题 如何取消Java 8可完成的未来?


我正在玩Java 8可完成的期货。我有以下代码:

CountDownLatch waitLatch = new CountDownLatch(1);

CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
    try {
        System.out.println("Wait");
        waitLatch.await(); //cancel should interrupt
        System.out.println("Done");
    } catch (InterruptedException e) {
        System.out.println("Interrupted");
        throw new RuntimeException(e);
    }
});

sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");

assertTrue(future.isCancelled());

assertTrue(future.isDone());
sleep(100); //give it some time to finish

使用runAsync我计划执行等待锁存器的代码。接下来我取消了未来,期望被抛入中断的异常。但似乎线程在await调用上仍然被阻塞,即使未来被取消(断言传递),也不会抛出InterruptedException。使用ExecutorService的等效代码按预期工作。它是CompletableFuture中的错误还是我的示例中的错误?


10161
2018-04-27 07:03


起源

你能重现这个问题吗? Executors.newFixedThreadPool VS Executors.newWorkStealingPool?与比较期货与可完成期货相比,这将使比较两种不同的执行者实施的问题更加清晰。 - nosid
JavaDoc说cancel(true)取消了CancellationException但是你没有抓住它。 - edharned
@nosid你是对的,newWorkStealingPool显然不支持取消 - Lukas


答案:


显然,这是故意的。该方法的Javadoc CompletableFuture ::取消 状态:

[参数:] mayInterruptIfRunning - 此值有 没有 此实现中的效果,因为中断不用于控制处理。

有趣的是,方法 ForkJoinTask ::取消 对参数使用几乎相同的措辞 mayInterruptIfRunning

我猜这个问题:

  • 中断 旨在用于阻塞操作,例如 睡觉等待 或I / O操作,
  • 但都没有 CompletableFuture 也不 ForkJoinTask 旨在用于阻止操作。

而不是阻止,一个 CompletableFuture 应该创造一个新的 CompletionStage和cpu绑定任务是fork-join模型的先决条件。所以,使用 中断 他们中的任何一个都会打败他们的目的。而另一方面,它可能会增加复杂性,如果按预期使用则不需要。


11
2018-04-27 21:38



为什么你这么想 CompletableFuture 和 ForkJoinTask 不打算用于阻塞操作? - Ivan Hristov
CompletableFuture和其他反应性内容的重点是:不要浪费线程让它们等待一些长时间操作的结果。相反,提供一个在结果出现时调用的回调。反应式方法需要更少的线程。 - Kirill Gamazkov


你打电话时 CompletableFuture#cancel,你只停止链的下游部分。上游部分,我。即最终会打电话的东西 complete(...) 要么 completeExceptionally(...),没有得到任何结果不再需要的信号。

什么是'上游'和'下游'的东西?

我们考虑以下代码:

CompletableFuture
        .supplyAsync(() -> "hello")               //1
        .thenApply(s -> s + " world!")            //2
        .thenAccept(s -> System.out.println(s));  //3

在这里,数据从上到下流动 - 从供应商创建,到功能修改,再到消费 println。上述特定步骤的部分称为上游部分,下部部分称为下游部分。 E. g。步骤1和2是步骤3的上游。

这是幕后发生的事情。这不是精确的,而是一个方便的思维模型。

  1. 正在执行供应商(步骤1)(在JVM的常见内容中) ForkJoinPool)。
  2. 然后供应商的结果通过 complete(...) 到下一个 CompletableFuture 下游。
  3. 收到结果后,那 CompletableFuture 调用下一步 - 一个函数(步骤2),它接收前一步结果并返回将进一步传递给下游的内容 CompletableFuturecomplete(...)
  4. 在收到步骤2结果后,执行步骤3 CompletableFuture 调用消费者, System.out.println(s)。消费者完成后,下游 CompletableFuture 会收到它的价值, (Void) null

我们可以看到,每一个 CompletableFuture 在这个链中必须知道谁在下游等待值传递给他们的 complete(...)(要么 completeExceptionally(...))。但是 CompletableFuture 不必知道它的上游(或上游 - 可能有几个)。

因此,打电话 cancel() 在步骤3中不会中止步骤1和2,因为从步骤3到步骤2没有链接。

假设你正在使用它 CompletableFuture 然后你的步骤足够小,这样如果执行一些额外的步骤就没有坏处。

如果要将取消传播到上游,您有两种选择:

  • 自己实现 - 创建一个专门的 CompletableFuture (说出来就好 cancelled)每一步后检查一次(类似于 step.applyToEither(cancelled, Function.identity())
  • 使用反应堆栈,如RxJava 2,ProjectReactor / Flux或Akka Streams

1
2018-06-06 18:28





您需要CompletionStage的替代实现来完成真正的线程中断。我刚刚发布了一个小型图书馆,正是为了这个目的 - https://github.com/vsilaev/tascalate-concurrent


1
2017-07-24 21:30





CancellationException是内部ForkJoin取消例程的一部分。检索未来结果时会出现异常:

try { future.get(); }
      catch (Exception e){
          System.out.println(e.toString());            
      }

花了一段时间才在调试器中看到这个。 JavaDoc并不清楚发生了什么或者你应该期待什么。


0
2018-04-27 22:25