问题 如何在多线程C ++中拆除观察者关系?


我有一个提供的主题 Subscribe(Observer*) 和 Unsubscribe(Observer*) 对客户。 Subject在它自己的线程中运行(它从中调用 Notify() 在订阅的Observers上)和互斥锁保护其内部的观察者列表。

我希望客户端代码 - 我无法控制 - 能够在取消订阅后安全地删除Observer。怎么能实现这一目标?

  • 持有互斥锁 - 甚至是递归的 互斥 - 我通知观察者 不是一个选择,因为 死锁风险。
  • 我可以标记一个观察员去除 在Unsubscribe调用中删除它 来自主题线程。然后 客户可以等待一个特殊的 “安全删除”通知。这个 看起来很安全,但是很麻烦 客户端。

编辑

一些说明性代码如下。问题是如何防止取消订阅发生时,运行是在'问题在这里'评论。然后我可以回拨一个已删除的对象。或者,如果我持有互斥锁而不是制作副本,我可以使某些客户端死锁。

#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer
{
public:
    void Notify() {}
};

class Subject
{
public:
    Subject() : t(bind(&Subject::Run, this))
    {
    }

    void Subscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.insert(o);
    }

    void Unsubscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.erase(o);
    }

    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            set<Observer*> notifyList;
            {
                mutex::scoped_lock l(m);
                notifyList = observers;
            }
            // Problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&Observer::Notify));
        }
    }

private:
    set<Observer*> observers;
    thread t;
    mutex m;
};

编辑 

由于死锁风险,我无法在持有互斥锁时通知观察者。这种情况最明显的方式 - 客户端从Notify内部调用Subscribe或Unsubscribe - 可以通过使互斥锁递归来轻松解决。更阴险的是不同线程上间歇性死锁的风险。

我处于多线程环境中,因此在线程执行的任何时候,它通常会保存一系列锁L1,L2,... Ln。另一个线程将保持锁K1,K2,... Km。正确编写的客户端将确保不同的线程始终以相同的顺序获取锁。但是当客户端与我的Subject的互斥体交互时 - 称之为X - 这个策略将被破坏:调用订阅/取消订阅以L1,L2,... Ln,X的顺序获取锁。从我的主题线程调用Notce获取锁定X,K1,K2,... Km的顺序。如果Li或Kj中​​的任何一个可以在任何呼叫路径上重合,则客户端会遇到间歇性死锁,几乎没有调试它的可能性。由于我不控制客户端代码,我不能这样做。


8642
2018-02-10 20:26


起源

我错过了什么。特定观察员何时取消订阅其他观察员?具体的观察者应该只是取消订阅。 notifylist只能在其中拥有相同的观察者,那么已经触发的通知如何变得陈旧? - jmucchiello
客户端可以随时取消订阅观察者,而不仅仅是在通知中。在这种一般情况下,我们得到了对主题线程的竞争。 - fizzer
@fizzer:这里不清楚你是如何实现内存安全的。你怎么保证这一切 Observer* 在 observers 指向有效的实例 Observer? - Matthieu M.
@Matthieu。这只是客户的责任。假设在调用Subscribe()的线程上分配了一个实例,并且为了简单起见,同一客户端线程想要在调用Unsubscribe()之后销毁该实例。如果有的话,执行删除是安全的,这是问题的一个要点。 - fizzer


答案:


“理想”解决方案将涉及使用 shared_ptr 和 weak_ptr。但是,为了通用,它还必须考虑到问题 Subject 在一些之前被丢弃 Observer (是的,这也可能发生)。

class Subject {
public:
    void Subscribe(std::weak_ptr<Observer> o);
    void Unsubscribe(std::weak_ptr<Observer> o);

private:
    std::mutex mutex;
    std::set< std::weak_ptr<Observer> > observers;
};

class Observer: boost::noncopyable {
public:
    ~Observer();

    void Notify();

private:
    std::mutex;
    std::weak_ptr<Subject> subject;
};

通过这种结构,我们创建了一个循环图,但明智地使用了 weak_ptr 这两个 Observer 和 Subject 没有协调就可以销毁。

注意:为简单起见,我假设一个 Observer 观察一个 Subject 一次,但它可以很容易地观察到多个科目。


现在,您似乎陷入了不安全的内存管理。这是一个非常困难的情况,你可以想象。在这种情况下,我会建议一个实验: 异步 Unsubscribe。或至少,致电 Unsubscribe 将从外部同步,但异步实现。

这个想法很简单:我们将使用事件队列来实现同步。那是:

  • 打电话给 Unsubscribe 在队列中发布事件(有效负载 Observer*)然后等待
  • 当。。。的时候 Subject 线程已经处理完了 Unsubscribe event(s),它唤醒了等待的线程

您可以使用busy-waiting或条件变量,除非性能另有规定,否则我会建议条件变量。

注意:此解决方案完全无法解释 Subject 过早死亡。


1
2018-04-13 07:43



我不想假设事件队列,或者在客户端上强加特定的所有权语义,但我喜欢Unsubscribe()等待通知线程取消订阅的想法。 - fizzer


Unsubscribe()应该是同步的,因此在Observer不再保留在Subject列表中之前它不会返回。这是安全地做到这一点的唯一方法。

ETA(将我的评论转移到答案):

由于时间似乎不是问题,因此在通知每个观察者之间取出并释放互斥锁。您将无法以现在的方式使用for_each,并且您必须检查迭代器以确保它仍然有效。

for ( ... )
{
    take mutex
    check iterator validity
    notify
    release mutex
}

这将做你想要的。


7
2018-02-10 20:53



我喜欢这个解决方案。听起来不错。 - MarkR
很好。如果只有一些方法来编码它... - fizzer
由于时间似乎不是问题,因此在通知每个观察者之间取出并释放互斥锁。 - Rob K
我想避免在调用客户端代码时持有互斥锁,因为存在死锁风险。 - fizzer
您调用的客户端代码应该在您的线程的上下文中执行,因此它不应该是一个问题。如果它尝试使用相同的互斥锁,因为它是相同的线程,它将成功。 - Rob K


你能改变Subscribe()和Unsubscribe()的签名吗?用shared_ptr <Observer>替换Observer *会使事情变得更容易。

编辑:上面的“更容易”替换“简单”。 有关如何难以“正确”的示例,请参阅该历史 Boost.Signals 而且 采用 - 丁还未在-的分布 Boost.Signals2 (以前的Boost.ThreadSafeSignals)库。


3
2018-02-10 20:42



我想到了这一点 - 我甚至可以在主题中持有weak_ptr,客户甚至不需要取消订阅。不幸的是,我无法改变这个界面,但我将来肯定会这样做。 - fizzer
我使用shared_ptr和weak_ptr构建了很多这样的东西,它们运行良好。太糟糕了,你不能改变API。 - timday
虽然这解决了“谁删除谁”资源问题,但没有解决同步问题,您仍然可能遇到已经取消订阅的观察者发生通知的情况 - 这可能不是问题。 - Greg Rogers
@fizzer:注意使用 enable_shared_from_this 作为基类 Observer 您可以轻松地将其改装到您的设计中(它将与客户端代码源兼容)。 - Matthieu M.


而不是让客户端获得“SafeToDelete”通知,而是为他们提供IsSubscribed(Observer *)方法。然后客户端代码变为:

subject.Unsubscribe( obsever );l
while( subject.IsSubscribed( observer ) ) {
   sleep_some_short_time;   // OS specific sleep stuff
}
delete observer;

这不是太繁重。


1
2018-02-10 20:50



如果IsSubscribed()只是在观察者集中查看,它仍然被打破。在我发布的示例代码中,主题线程在“问题此处”注释中处于睡眠状态时,整个事情可以完成。 - fizzer


您可以在CSubject类型中创建“删除队列”。删除Observer时,可以调用pSubject-> QueueForDelete(pObserver)。然后,当主题线程在通知之间时,它可以安全地从队列中删除观察者。


1
2018-02-10 20:55





嗯......我真的不明白你的问题,因为如果一个客户端调用Unsubscribe你应该能够让客户端删除它(它不被你使用)。但是,如果由于某种原因,一旦客户端取消订阅观察者,您就无法关闭关系,您可以添加“主题”以安全地删除观察者,或者只是让客户发出信号表明他们不再对观察者感兴趣。

重新思考编辑:好的,现在我想我明白你的问题是什么。我认为解决您问题的最佳方法是执行以下操作:

  1. 让每个存储的observer元素都有一个“有效”标志。当您处于通知循环中时,此标志将用于通知它。
  2. 您需要一个互斥锁来保护对该“有效”标志的访问。然后,取消订阅操作会锁定互斥锁以获取“有效”标志,并为所选观察者将其设置为false。
  3. 通知循环还必须锁定和解锁有效标志的互斥锁,并且仅对“有效”的观察者起作用。

鉴于取消订阅操作将阻止互斥锁重置有效标志(并且该特定Observer将不再在您的线程中使用),代码是线程安全的,并且客户端可以在取消订阅时立即删除任何观察者回。


1
2018-02-10 20:47





这样的事情会令人满意吗?虽然在通知时取消订阅观察者仍然是不安全的,因为你需要一个像你提到的界面(据我所知)。

Subscribe(Observer *x)
{
    mutex.lock();
    // add x to the list
    mutex.unlock();
}

Unsubscribe(Observer *x)
{
    mutex.lock();
    while (!ok_to_delete)
        cond.wait(mutex);
    // remove x from list
    mutex.unlock();
}

NotifyLoop()
{
    while (true) {
        // wait for something to trigger a notify

        mutex.lock();
        ok_to_delete = false;
        // build a list of observers to notify
        mutex.unlock();

        // notify all observers from the list saved earlier

        mutex.lock();
        ok_to_delete = true;
        cond.notify_all();
        mutex.unlock();
    }
}

如果你想在Notify()里面取消订阅() - (关于客户IMO的糟糕设计决定......) 您可以将通知程序线程的线程ID添加到数据结构中。在Unsubscribe函数中,您可以根据当前线程的id检查该线程ID(大多数线程库提供此功能 - 例如.pthread_self)。如果它们相同,则无需等待条件变量即可继续。

注意:如果客户端负责删除观察者,这意味着您遇到Notify回调内部的情况,您将取消订阅并删除观察者,但仍在执行带有该指针的垃圾。这是客户必须知道的事情,并且只在Notify()的末尾删除它。


1
2018-02-10 20:50



但是,如果观察者取消订阅()本身来自其Notify回调,那么会不会陷入僵局? - Éric Malenfant
+1,因为它解决了问题的关键。我希望客户能够通过回调订阅,所以我需要更多的思考。令人害怕的是,这个问题没有一个众所周知的习语。我每天都看到这种模式,总是破碎。 - fizzer
fizzer - 订阅回调很好。在订阅时,您不需要等待条件变量,只需要在调用Notify()之前显式释放互斥锁。只有取消订阅才是问题所在。 - Greg Rogers
对不起,我昨晚发布了评论。我想取消订阅可以回调。取消订阅以响应事件对于客户来说是非常合理的事情。 - fizzer
对延迟回应表示歉意。我认为它几乎存在,除了条件需要阻止客户端返回(并可能调用删除)。因此,在Notify循环中,ok_to_delete需要为false,并且客户端需要等待它 后 '从列表中删除x' - fizzer


我认为如果不是非常优雅的话,这会成功:

class Subject {
public:
Subject() : t(bind(&Subject::Run, this)),m_key(0)    {    }
void Subscribe(Observer* o) {
    mutex::scoped_lock l(m);
    InternalObserver io( o );
    boost::shared_ptr<InternalObserver> sp(&io);
    observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp));
}

void Unsubscribe(Observer* o) {
    mutex::scoped_lock l(m);
    observers.find( MakeKey(o) )->second->exists = false;    }

void WaitForSomethingInterestingToHappen() {}
void Run()
{
    for (;;)
    {
        WaitForSomethingInterestingToHappen();
        for( unsigned int i = 0; i < observers.size(); ++ i )
        {
            mutex::scoped_lock l(m);
            if( observers[i]->exists )
            {
                mem_fun(&Observer::Notify);//needs changing
            }
            else
            {
                observers.erase(i);
                --i;
            }
        }
    }
}
private:

int MakeKey(Observer* o) {
    return ++m_key;//needs changeing, sha of the object?
}
class InternalObserver {
public:
    InternalObserver(Observer* o) : m_o( o ), exists( true ) {}
    Observer* m_o;
    bool exists;
};

map< int, boost::shared_ptr<InternalObserver> > observers;
thread t;
mutex m;
int m_key;
};

0
2018-02-11 16:34





更改 observers 到了 map 用钥匙 Observer* 并重视一个包装 Observer。包装包括一个 volatile boolean表示是否 Observer 已验证。在 subscribe 方法,在中创建包装器对象 有效 州。在 unsubscribe 方法,包装器标记为 无效Notify 被召唤 包装纸 而不是 实际的观察者。包装器将调用 Notify 在...上 实际的观察者 如果它是有效的(仍然订阅)

#include <map>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer
{
public:
    void Notify() {}
};

class ObserverWrapper : public Observer
{
public:
    Observer* wrappee;
    volatile bool valid;
    ObserverWrapper(Observer* o) 
    {
        wrappee = o;
        valid = true;
    }

    void Notify() 
    {
        if (valid) wrappee->Notify();
    }
}
class Subject
{
public:
    Subject() : t(bind(&Subject::Run, this))
    {
    }

    void Subscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        boost::shared_ptr<ObserverWrapper> sptr(new ObserverWrapper(o));
        observers.insert(pair<Observer*, sptr));
    }

    void Unsubscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.find(o)->second->valid = false;
        observers.erase(o);
    }

    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            vector<ObserverWrapper*> notifyList;
            {
                mutex::scoped_lock l(m);
                boost::copy(observers | boost::adaptors::map_values, std::back_inserter(notifyList));
            }
            // Should be no problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&ObserverWrapper::Notify));
        }
    }

private:
    map<Observer*, ObserverWrapper*> observers;
    thread t;
    mutex m;
};

0
2018-04-11 05:20



如果主题线程在“此处应该没有问题”行中被暂停,则无法阻止客户端线程取消订阅。 - fizzer
@fizzer是的,取消订阅可能会发生。我认为这应该不是问题,因为取消订阅操作会将ObserverWrapper中的有效标志设置为false。因此不会在Observer上调用notify。 - Sameer
我很抱歉。该漏洞是在测试有效标志和在保护上调用Notify之间。 - fizzer
确实!我确实错过了。 - Sameer