我正在构建一个包含许多部分的小型系统,我想使用消息pub / sub服务在各部分之间进行通信。
我读到了一些像RabbitMQ和ZeroMQ这样的消息队列服务,但我觉得它们太复杂了,感觉它是为分布式系统而生的。我的系统的所有部分都将用C ++ / Linux编写并放在一个小的Raspberry Pi CPU上,所以我不需要像可扩展的,跨平台的,其他语言客户端这样的功能......
你能给我一些适合我需要的服务或图书馆的建议吗?或者你
谢谢。
我正在构建一个包含许多部分的小型系统,我想使用消息pub / sub服务在各部分之间进行通信。
我读到了一些像RabbitMQ和ZeroMQ这样的消息队列服务,但我觉得它们太复杂了,感觉它是为分布式系统而生的。我的系统的所有部分都将用C ++ / Linux编写并放在一个小的Raspberry Pi CPU上,所以我不需要像可扩展的,跨平台的,其他语言客户端这样的功能......
你能给我一些适合我需要的服务或图书馆的建议吗?或者你
谢谢。
实际上做自己并不难。
首先,您需要定义要使用的协议。它可以很简单;就像消息类型字段,有效负载大小字段和实际有效负载一样。您需要的消息类型 SUBSCRIBE
, UNSUBSCRIBE
和 PUBLISH
。有效载荷 SUBSCRIBE
和 UNSUBSCRIBE
messages是要订阅/取消订阅的频道的名称。有效载荷 PUBLISH
message是通道名称和实际数据(当然还有数据大小)。
要连接所有订户,您需要一个中央服务器。所有订阅者/发布者都需要连接到此服务器。服务器程序保留一组队列,每个通道一个队列。当订阅或发布消息到达服务器以寻找不存在的通道时,请为此通道创建新的消息队列。对于每个通道,服务器还需要订阅该通道的所有客户端的集合。当发布消息到达服务器时,它会被添加到相关频道的队列末尾。虽然通道队列不为空,但是将其副本发送给该通道的所有订户,并且当所有订户都收到该通道时,可以从队列中删除该消息。
服务器的难点可能是通信部分。简单的部分将是所有队列和集合,因为您可以使用 C ++标准容器 对于他们所有人(例如 std::queue
对于实际队列, std::unordered_map
对于渠道,和 std::vector
用于收集连接的客户。)
客户端非常简单,所有需要做的是能够发送订阅并将消息发布到服务器,并从服务器接收发布消息。困难的部分将再次成为实际的沟通部分。
后记:
我从来没有真正建立过这样一个我自己的系统,所有这些都只是直接在我的头顶。经验丰富的程序员不需要花费几个小时来实现基础知识,对于缺乏经验的程序员来说,可能需要几天时间。
对于您可以使用的通信,例如 提升ASIO,也许用一个 线程 每个频道。你可以使用类似的东西 提升属性树 构造/解析 JSON 要么 XML 消息。
然而,当你可能在几个小时内开始使用像RabbitMQ这样的现有系统时,所有这些都重新发明了轮子,为你节省了大量的时间(以及很多错误!)
就轻量级服务器而言, Redis的 支持pub / sub命令。
Redis代码本身非常紧凑(只有几个文件),它是单线程的(使用事件循环),并且内存消耗非常低(与我见过的其他Queing系统相比)。
我知道它已经晚了但可能对其他人有用。我使用boost在C ++中实现了一个基本的pub / sub。
用法很简单。从一端在通道上发布您的数据(通用地图),另一侧订阅相同的通道并再次接收通用地图。
// you should create a singleton object of NotificationService class, make it accessible throughout your application.
INotificationService* pNotificationService = new NotificationService();
// Subscribe for the event.
function<NotificationHandler> fnNotificationHandler = bind(&SubscriberClass::NotificationHandlerFunction, this, std::placeholders::_1);
subscriptionToken = pNotificationService->Subscribe("TEST_CHANEL", fnNotificationHandler);
// Publish event
NotificationData _data;
_data["data1"] = "Hello";
_data["data2"] = "World";
pNotificationService->Publish("TEST_CHANEL", _data);
// Unsubscribe event.
pNotificationService->Unsubscribe(subscriptionToken);