问题 zeromq持久性模式


谁必须管理ZeroMQ中的持久性?

当我们在Python语言中使用ZeroMQ客户端时,有哪些插件/模块可用于管理持久性?

我想知道使用ZeroMQ的模式。


8123
2017-10-30 16:58


起源

ZeroMQ指南具有称为泰坦尼克号的持久性模式。它基于一种名为MajorDomo的模式。您也可以非常轻松地创建自己的持久队列。但是,在开始添加持久性之前,值得详细了解您的可靠性要求。该指南深入介绍了这一点。只是在某处添加持久性不会给你可靠性。 - Pieter Hintjens
@PieterHintjens请将此添加为答案。我差点错过了。它是本页面上最有价值的资源。 - Indradhanush Gupta


答案:


据我所知,Zeromq没有任何持久性。它超出了它的范围,需要由最终用户处理。就像序列化消息一样。 在C#中,我使用db4o来添加持久性。通常,我将对象保持在原始状态,然后序列化并将其发送到ZMQ套接字。顺便说一句,这是PUB / SUB对。


9
2017-12-07 07:49





在应用程序结束时,您可以相应地持久化,例如我在node.js中构建了一个持久层,该层传递给后端php调用和websockets。

持久性方面持有一段时间的消息(http://en.wikipedia.org/wiki/Time_to_live)这是为了给客户提供连接的机会。我使用了内存中的数据结构,但我玩弄了使用redis获得磁盘持久性的想法。


3
2018-03-06 22:46





我们需要在处理之前保留收到的消息。消息在单独的线程中接收并存储在磁盘上,而持久消息队列在主线程中被操作。

该模块可在以下位置获得: https://pypi.org/project/persizmq。从文档:

import pathlib

import zmq

import persizmq

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")

persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)

def on_exception(exception: Exception)->None:
    print("an exception in the listening thread: {}".format(exception))

with persizmq.ThreadedSubscriber(
    callback=storage.add_message, subscriber=subscriber, 
    on_exception=on_exception):

    msg = storage.front()  # non-blocking
    if msg is not None:
        print("Received a persistent message: {}".format(msg))
        storage.pop_front()

1
2018-05-05 08:28