问题 CQRS - 如何处理新报告表(或:如何从事件存储中导入所有历史记录)


我研究了一些使用事件源作为事件存储的CQRS样本实现(Java / .Net)和一个简单的(No)SQL存储作为“报告存储”。

看起来很好,但我似乎在所有示例实现中都缺少某些东西。

在应用程序投入生产后,如何处理新报表存储/屏幕的添加?以及如何将现有(最新)数据从事件存储导入新报表存储?

即:

想象一下基本的DDD / CQRS驱动的CRM应用程序。 每个屏幕(真的查看)都有自己的结构化报表存储(SQL表)。 所有这些视图都使用侦听域事件(CustomerCreated / CustomerHasMoved等)的处理程序进行更新。

CRM的一个特点是它可以记录电话呼叫(PhoneCallLogged事件)。由于时间限制,我们只在CRM的V1中实现了电话记录的记录(查看和报告谁处理了哪个电话将在V2中实施)

在生产运行一段时间后,我们希望实现每个客户和销售代表记录的电话呼叫的“报告”。

因此,我们需要添加一些屏幕(视图)和支持报告表(在报告存储中),并用事件存储中已收集的数据填充它...

这是我在看我研究的样品时卡住的地方。它们不处理将现有(历史)数据从事件存储导入(新)报表存储。

EventRepository(DomainRepository)的所有示例只有一个方法'GetById'和'Add',它们不支持一次性获取所有聚合根以填充新的报告表。

如果没有初始数据导入,新屏幕仅针对新发生的事件进行更新。不适用于已记录的电话(因为没有PhoneCallLogged事件的报告侦听器)

有什么建议,建议吗?

提前致谢,

REMCO


1511
2018-04-01 08:37


起源



答案:


您在现有事件日志上重新运行处理程序(例如,您通过新事件处理程序播放旧事件)

以您为例...您的事件日志中有大量的PhoneCallLoggedEvents。带上你的新手柄并通过它播放所有旧事件。它就像它一直在运行,并将继续处理任何到达的新事件。

干杯,

格雷格


8
2018-04-14 13:56



所以基本上你需要在部署过程中集成某种一次性事件运行器(报告导入)? - Remco Ros
并且您需要对事件存储进行“查询”访问:store.GetAllEvents <PhoneCallLogged>();这似乎是反直觉的,因为(正如我所说)我见过的所有EventStore实现(都是.net / java)都不允许这样的读访问... - Remco Ros
通常,提供的实现仅包含运行时所需的内容,但不包含维护所需的内容。 - thinkbeforecoding
非常清楚。所以这一切都与分离有关。您仍然需要一些机制来确定通过此“维护”组件发送哪些事件。由于新处理程序为新事件订阅和初始导入历史事件的时间范围,您不希望错过事件或处理相同的事件两次。 - Remco Ros
通常最好的是:订阅该类型的事件。查询先前的事件(这里可能有多个)让订阅队列并逐个处理先前的事件。然后吃掉订阅的事件(必须是幂等的,所以你不要加倍处理)存储库不支持它的部分原因是你永远不想这样做 在 域名。存储库是域与事件存储的合同。 - Greg Young


答案:


您在现有事件日志上重新运行处理程序(例如,您通过新事件处理程序播放旧事件)

以您为例...您的事件日志中有大量的PhoneCallLoggedEvents。带上你的新手柄并通过它播放所有旧事件。它就像它一直在运行,并将继续处理任何到达的新事件。

干杯,

格雷格


8
2018-04-14 13:56



所以基本上你需要在部署过程中集成某种一次性事件运行器(报告导入)? - Remco Ros
并且您需要对事件存储进行“查询”访问:store.GetAllEvents <PhoneCallLogged>();这似乎是反直觉的,因为(正如我所说)我见过的所有EventStore实现(都是.net / java)都不允许这样的读访问... - Remco Ros
通常,提供的实现仅包含运行时所需的内容,但不包含维护所需的内容。 - thinkbeforecoding
非常清楚。所以这一切都与分离有关。您仍然需要一些机制来确定通过此“维护”组件发送哪些事件。由于新处理程序为新事件订阅和初始导入历史事件的时间范围,您不希望错过事件或处理相同的事件两次。 - Remco Ros
通常最好的是:订阅该类型的事件。查询先前的事件(这里可能有多个)让订阅队列并逐个处理先前的事件。然后吃掉订阅的事件(必须是幂等的,所以你不要加倍处理)存储库不支持它的部分原因是你永远不想这样做 在 域名。存储库是域与事件存储的合同。 - Greg Young


例如,在Axon Framework中,可以通过以下方式完成:

JdbcEventStore eventStore = ...;

ReplayingCluster replayingCluster = new ReplayingCluster(
            new SimpleCluster("replaying"),
            eventStore,
            new NoTransactionManager(),
            0,
            new BackloggingIncomingMessageHandler());

replayingCluster.startReplay();

事件重放是一个尚未完全记录并且缺乏成熟工具的领域,但这里有一些起点:


2
2018-01-23 18:07





'EventRepository'只包含这些方法,因为您只需要在生产中使用它们。

为报告添加新的非规范化时,您可以将所有事件从开始发送到您的处理程序。

您可以通过以下方式在开发站点上执行此操作:

  • 将事件日志加载到开发站点
  • 将所有事件发送到您的非规范化处理程序
  • 将新视图+处理程序移动到生产站点
  • 运行中间发生的事件
  • 现在你准备好了

1
2018-04-14 14:05



这在你描述的方式中并不是真的可行。我们无权访问生产数据(事件)。但根据你和Greg的回答,我现在看到你需要某种一次性导入组件(Event Runner),它在将新处理程序部署到生产环境时只运行一次。 - Remco Ros
是什么阻止你参加活动?一旦发生,他们将永远不会改变,因此没有风险来制作副本。但无论如何,您需要一个读取所有事件并调用处理程序的组件(或进程)。 - thinkbeforecoding