问题 需要一个轻量级的pubsub服务/库


我正在构建一个包含许多部分的小型系统,我想使用消息pub / sub服务在各部分之间进行通信。

我读到了一些像RabbitMQ和ZeroMQ这样的消息队列服务,但我觉得它们太复杂了,感觉它是为分布式系统而生的。我的系统的所有部分都将用C ++ / Linux编写并放在一个小的Raspberry Pi CPU上,所以我不需要像可扩展的,跨平台的,其他语言客户端这样的功能......

你能给我一些适合我需要的服务或图书馆的建议吗?或者你

谢谢。


8988
2018-01-22 08:54


起源

你需要使用lib还是服务?或者,根据先决条件,您可以使用插座或管道吗?对Raspberry而言,这可能更有效。 - Lorenzo Dematté
ZeroMQ使用起来非常简单。 - Kimi
@ dema80:我更喜欢服务,但lib也行。你能建议任何工具使用套接字吗? - Yoshi
作为替代......你可以考虑使用dbus freedesktop.org/wiki/Software/dbus - Lorenzo Dematté
ZeroMQ不是pub / sub - 它是面向消息的套接字。看看Apache Qpid - 它用C ++实现AMQP,代码看起来并不坏。 - Maxim Egorushkin


答案:


实际上做自己并不难。

首先,您需要定义要使用的协议。它可以很简单;就像消息类型字段,有效负载大小字段和实际有效负载一样。您需要的消息类型 SUBSCRIBEUNSUBSCRIBE 和 PUBLISH。有效载荷 SUBSCRIBE 和 UNSUBSCRIBE messages是要订阅/取消订阅的频道的名称。有效载荷 PUBLISH message是通道名称和实际数据(当然还有数据大小)。

要连接所有订户,您需要一个中央服务器。所有订阅者/发布者都需要连接到此服务器。服务器程序保留一组队列,每个通道一个队列。当订阅或发布消息到达服务器以寻找不存在的通道时,请为此通道创建新的消息队列。对于每个通道,服务器还需要订阅该通道的所有客户端的集合。当发布消息到达服务器时,它会被添加到相关频道的队列末尾。虽然通道队列不为空,但是将其副本发送给该通道的所有订户,并且当所有订户都收到该通道时,可以从队列中删除该消息。

服务器的难点可能是通信部分。简单的部分将是所有队列和集合,因为您可以使用 C ++标准容器 对于他们所有人(例如 std::queue 对于实际队列, std::unordered_map 对于渠道,和 std::vector 用于收集连接的客户。)

客户端非常简单,所有需要做的是能够发送订阅并将消息发布到服务器,并从服务器接收发布消息。困难的部分将再次成为实际的沟通部分。


后记:

我从来没有真正建立过这样一个我自己的系统,所有这些都只是直接在我的头顶。经验丰富的程序员不需要花费几个小时来实现基础知识,对于缺乏经验的程序员来说,可能需要几天时间。

对于您可以使用的通信,例如 提升ASIO,也许用一个 线程 每个频道。你可以使用类似的东西 提升属性树 构造/解析 JSON 要么 XML 消息。

然而,当你可能在几个小时内开始使用像RabbitMQ这样的现有系统时,所有这些都重新发明了轮子,为你节省了大量的时间(以及很多错误!)


7
2018-01-22 09:22



我是第二个想法,但只有你要构建的“框架”超级简单且需要轻量级,否则......不要重新发明轮子!我在.NET中为过去的项目构建了这种系统,仅用了几天,包括所有内容。这很有趣,而且很快。因为当我们后来需要扩展时非常经济,所以我们将它扔掉并替换它而不回头。在C ++中,我也会使用Boost。 - Lorenzo Dematté
你是什​​么意思?在客户端服务器系统中,客户端始终启动连接。你的意思是,这种联系永远不应该被关闭? - Roshan Mehta
这里的客户是指订户。 - Roshan Mehta


就轻量级服务器而言, Redis的 支持pub / sub命令。

Redis代码本身非常紧凑(只有几个文件),它是单线程的(使用事件循环),并且内存消耗非常低(与我见过的其他Queing系统相比)。


4
2018-01-22 09:38



我非常喜欢Redis,并且已经将它用作一些私有项目的缓存/商店服务。只是好奇它是否适合像树莓PI这样的小型CPU? - Yoshi
@Yoshi:与任何其他排队系统(ActiveMQ,RabbitMQ,HornetQ,......)相比,它肯定更适合。我会更关心RAM,但你可以随时删除Lua JIT等......以获得更轻松的过程。 - Matthieu M.
似乎redis仅适用于基于文本的消息。二进制怎么样? - liuyanghejerry
@liuyanghejerry:它对键和值都是不可知的,并将内容视为原始内存;所以没问题。 - Matthieu M.


我知道它已经晚了但可能对其他人有用。我使用boost在C ++中实现了一个基本的pub / sub。

CppPubSub

用法很简单。从一端在通道上发布您的数据(通用地图),另一侧订阅相同的通道并再次接收通用地图。

// you should create a singleton object of NotificationService class, make it accessible throughout your application. 
INotificationService* pNotificationService = new NotificationService();

// Subscribe for the event.
function<NotificationHandler> fnNotificationHandler = bind(&SubscriberClass::NotificationHandlerFunction, this, std::placeholders::_1);
subscriptionToken = pNotificationService->Subscribe("TEST_CHANEL", fnNotificationHandler);

// Publish event
NotificationData _data;
_data["data1"] = "Hello";
_data["data2"] = "World";
pNotificationService->Publish("TEST_CHANEL", _data);

// Unsubscribe event.
pNotificationService->Unsubscribe(subscriptionToken);

3
2017-11-19 15:49



那么(客户端)订户与服务器的连接(发布者是持久的?如果连接失败会发生什么。 - Roshan Mehta