问题 番石榴EventBus调度


我正在使用Guava的EventBus来启动一些处理和报告结果。这是一个非常简单的可编译示例:

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

public class Test {

    public static class InitiateProcessing { }
    public static class ProcessingStarted { }
    public static class ProcessingResults { }
    public static class ProcessingFinished { }

    public static EventBus bus = new EventBus();

    @Subscribe
    public void receiveStartRequest(InitiateProcessing evt) {
        System.out.println("Got processing request - starting processing");
        bus.post(new ProcessingStarted());

        System.out.println("Generating results");
        bus.post(new ProcessingResults());
        System.out.println("Generating more results");
        bus.post(new ProcessingResults());

        bus.post(new ProcessingFinished());
    }

    @Subscribe
    public void processingStarted(ProcessingStarted evt) {
        System.out.println("Processing has started");
    }

    @Subscribe
    public void resultsReceived(ProcessingResults evt) {
        System.out.println("got results");
    }

    @Subscribe
    public void processingComplete(ProcessingFinished evt) {
        System.out.println("Processing has completed");
    }


    public static void main(String[] args) {
        Test t = new Test();
        bus.register(t);
        bus.post(new InitiateProcessing());
    }
}

我将这些事件用作其他软件组件的一种方式,以便为此处理做准备。例如,他们可能必须在处理之前保存其当前状态并在之后恢复它。

我希望这个程序的输出是:

Got processing request - starting processing
Processing has started
Generating results
got results
Generating more results
got results
Processing has completed

相反,实际输出是:

Got processing request - starting processing
Generating results
Generating more results
Processing has started
got results
got results
Processing has completed

应该指示处理已经开始的事件实际发生在实际处理之后(“生成结果”)。

查看源代码后,我理解为什么它会以这种方式运行。这是相关的 源代码 为了 EventBus

  /**
   * Drain the queue of events to be dispatched. As the queue is being drained,
   * new events may be posted to the end of the queue.
   */
  void dispatchQueuedEvents() {
    // don't dispatch if we're already dispatching, that would allow reentrancy
    // and out-of-order events. Instead, leave the events to be dispatched
    // after the in-progress dispatch is complete.
    if (isDispatching.get()) {
        return;
    }
    // dispatch event (omitted)

发生了什么事,因为我已经调度了最高级别 InitiateProcessing 事件,其余的事件只是被推到队列的末尾。我希望这与.NET事件类似,在所有处理程序完成之前调用事件不会返回。

我不太明白这个实现的原因。当然,事件保证按顺序排列,但周围代码的顺序完全失真。

有没有办法让总线按照描述运行并产生所需的输出?我确实读过Javadocs

EventBus保证它不会调用订阅者方法   多个线程同时,除非方法明确允许   它通过承载@AllowConcurrentEvents批注。

但我不认为这适用于此 - 我在单线程应用程序中看到了这个问题。

编辑

这里问题的原因是我 post来自订户内部。由于事件总线不可重入,因此这些“子帖子”排队并在第一个处理程序完成后处理。我可以评论出来了 if (isDispatching.get()) { return; } 中的部分 EventBus 来源和一切都像我期望的那样 - 所以真正的问题是我通过这样做引入了哪些潜在的问题?似乎设计师做出了一个尽责的决定,不允许重入。


1299
2018-02-22 00:32


起源

看起来事件总线运行在它自己的线程中。这通常意味着,操作是异步执行的(并且一旦它是一个总线)保证按照其顺序传递,并且它与主线程无关 - injecteer
@injecteer它没有运行它自己的线程。他们确实有 AsyncEventBus 这允许你指定一个 Executor  - 但我没有使用它。这都是单线程的。 - zmb
你可能是对的。虽然我认为,他们在一个新的线程中运行:)你可以通过添加测试它 System.out.println( "curr thread: " + Thread.currentThread().getName() ) 对你的每一个处理 @Subscribe-d方法? - injecteer
为什么你.post()来自 中 订阅者开始?这里有点可疑 - fge
这正是我看到这个问题的原因。这不是一个有效的用例吗?从概念上讲,事件可以触发其他事件 - 我们一直在假设这是可以的。 - zmb


答案:


EventBus 通常基于以下原则操作:将事件发布到总线的代码不应关心订户对事件的处理方式,或者除了事件被发布的订单之外的时间(在同步事件总线的情况下)无论如何)。

如果您希望在方法过程中的特定时间调用特定方法,并且希望确保在方法继续之前完成这些方法(如您在示例中所示),为什么不直接调用这些方法?当您使用事件总线时,您明确地将代码与响应给定事件的确切事件分开。这在许多情况下是可取的并且是主要原因 EventBus 存在,但它似乎不是你想要的。


7
2018-02-24 18:24



“将我的代码与响应给定事件的确切事件分开” 是 我想要的是。我希望活动能够说“为发生的事情做好准备” - 我并不关心订阅者如何或者想要做些什么来做好准备。他们可能不需要做任何事情。我只是想确信在发布活动之后,订阅者已经准备好了,所以我可以继续采取一些行动。事件总线似乎非常适合这种情况,因为我可以轻松地添加和删除可能以不同方式处理事件的组件。发布活动的代码无关紧要。 - zmb
到目前为止,我从答案中得出的结论是,这种行为是设计的,事件总线不支持我的用例。我最后修改了总线以允许重入,这更适合我的用例。 - zmb


我试着总结一下Guava的EventBus事件传递行为:

如果是一个事件 E1 发布时刻 T1,所有订阅者都会收到通知 如果其中一个订阅者在其中发布了一个事件 @订阅 - 方法(片刻之后),“新”事件 E2 被排队和交付 之后。之后意味着:毕竟 @订阅 - 方法 E1 从 T1 确实回来了。

将这种“级联”事件发布与广度优先树遍历进行比较。

它似乎是EventBus的明确选择设计。


5
2017-12-17 09:29





虽然发布到EventBus后直到所有“订户”都被发出信号才会返回。这些订户可能尚未开始执行。这意味着当第一个bus.post返回时,您继续下一个帖子,而没有任何干预用户开始处理。

public void post(Object event)将事件发布到所有已注册的事件   订户。事件发生后,此方法将成功返回   已发布给所有订阅者,无论是否有任何例外   订阅者抛出。如果没有订阅订阅者   事件的类,事件不是DeadEvent,它将是   包裹在DeadEvent并重新发布。

参数:event - 要发布的事件。


2
2018-02-22 00:46



这不太对 - 在这种情况下,第一篇文章返回时所有事件都已处理完毕。无论哪种方式 - 这都不能解答我是否可以让公交车按预期行事的问题。 - zmb