问题 如何在多线程C ++中拆除观察者关系?
我有一个提供的主题 Subscribe(Observer*)
和 Unsubscribe(Observer*)
对客户。 Subject在它自己的线程中运行(它从中调用 Notify()
在订阅的Observers上)和互斥锁保护其内部的观察者列表。
我希望客户端代码 - 我无法控制 - 能够在取消订阅后安全地删除Observer。怎么能实现这一目标?
- 持有互斥锁 - 甚至是递归的
互斥 - 我通知观察者
不是一个选择,因为
死锁风险。
- 我可以标记一个观察员去除
在Unsubscribe调用中删除它
来自主题线程。然后
客户可以等待一个特殊的
“安全删除”通知。这个
看起来很安全,但是很麻烦
客户端。
编辑
一些说明性代码如下。问题是如何防止取消订阅发生时,运行是在'问题在这里'评论。然后我可以回拨一个已删除的对象。或者,如果我持有互斥锁而不是制作副本,我可以使某些客户端死锁。
#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
using namespace std;
using namespace boost;
class Observer
{
public:
void Notify() {}
};
class Subject
{
public:
Subject() : t(bind(&Subject::Run, this))
{
}
void Subscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.insert(o);
}
void Unsubscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.erase(o);
}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
set<Observer*> notifyList;
{
mutex::scoped_lock l(m);
notifyList = observers;
}
// Problem here
for_each(notifyList.begin(), notifyList.end(),
mem_fun(&Observer::Notify));
}
}
private:
set<Observer*> observers;
thread t;
mutex m;
};
编辑
由于死锁风险,我无法在持有互斥锁时通知观察者。这种情况最明显的方式 - 客户端从Notify内部调用Subscribe或Unsubscribe - 可以通过使互斥锁递归来轻松解决。更阴险的是不同线程上间歇性死锁的风险。
我处于多线程环境中,因此在线程执行的任何时候,它通常会保存一系列锁L1,L2,... Ln。另一个线程将保持锁K1,K2,... Km。正确编写的客户端将确保不同的线程始终以相同的顺序获取锁。但是当客户端与我的Subject的互斥体交互时 - 称之为X - 这个策略将被破坏:调用订阅/取消订阅以L1,L2,... Ln,X的顺序获取锁。从我的主题线程调用Notce获取锁定X,K1,K2,... Km的顺序。如果Li或Kj中的任何一个可以在任何呼叫路径上重合,则客户端会遇到间歇性死锁,几乎没有调试它的可能性。由于我不控制客户端代码,我不能这样做。
8642
2018-02-10 20:26
起源
答案:
“理想”解决方案将涉及使用 shared_ptr
和 weak_ptr
。但是,为了通用,它还必须考虑到问题 Subject
在一些之前被丢弃 Observer
(是的,这也可能发生)。
class Subject {
public:
void Subscribe(std::weak_ptr<Observer> o);
void Unsubscribe(std::weak_ptr<Observer> o);
private:
std::mutex mutex;
std::set< std::weak_ptr<Observer> > observers;
};
class Observer: boost::noncopyable {
public:
~Observer();
void Notify();
private:
std::mutex;
std::weak_ptr<Subject> subject;
};
通过这种结构,我们创建了一个循环图,但明智地使用了 weak_ptr
这两个 Observer
和 Subject
没有协调就可以销毁。
注意:为简单起见,我假设一个 Observer
观察一个 Subject
一次,但它可以很容易地观察到多个科目。
现在,您似乎陷入了不安全的内存管理。这是一个非常困难的情况,你可以想象。在这种情况下,我会建议一个实验: 异步 Unsubscribe
。或至少,致电 Unsubscribe
将从外部同步,但异步实现。
这个想法很简单:我们将使用事件队列来实现同步。那是:
- 打电话给
Unsubscribe
在队列中发布事件(有效负载 Observer*
)然后等待
- 当。。。的时候
Subject
线程已经处理完了 Unsubscribe
event(s),它唤醒了等待的线程
您可以使用busy-waiting或条件变量,除非性能另有规定,否则我会建议条件变量。
注意:此解决方案完全无法解释 Subject
过早死亡。
1
2018-04-13 07:43
Unsubscribe()应该是同步的,因此在Observer不再保留在Subject列表中之前它不会返回。这是安全地做到这一点的唯一方法。
ETA(将我的评论转移到答案):
由于时间似乎不是问题,因此在通知每个观察者之间取出并释放互斥锁。您将无法以现在的方式使用for_each,并且您必须检查迭代器以确保它仍然有效。
for ( ... )
{
take mutex
check iterator validity
notify
release mutex
}
这将做你想要的。
7
2018-02-10 20:53
你能改变Subscribe()和Unsubscribe()的签名吗?用shared_ptr <Observer>替换Observer *会使事情变得更容易。
编辑:上面的“更容易”替换“简单”。
有关如何难以“正确”的示例,请参阅该历史 Boost.Signals 而且 采用 - 丁还未在-的分布 Boost.Signals2 (以前的Boost.ThreadSafeSignals)库。
3
2018-02-10 20:42
而不是让客户端获得“SafeToDelete”通知,而是为他们提供IsSubscribed(Observer *)方法。然后客户端代码变为:
subject.Unsubscribe( obsever );l
while( subject.IsSubscribed( observer ) ) {
sleep_some_short_time; // OS specific sleep stuff
}
delete observer;
这不是太繁重。
1
2018-02-10 20:50
您可以在CSubject类型中创建“删除队列”。删除Observer时,可以调用pSubject-> QueueForDelete(pObserver)。然后,当主题线程在通知之间时,它可以安全地从队列中删除观察者。
1
2018-02-10 20:55
嗯......我真的不明白你的问题,因为如果一个客户端调用Unsubscribe你应该能够让客户端删除它(它不被你使用)。但是,如果由于某种原因,一旦客户端取消订阅观察者,您就无法关闭关系,您可以添加“主题”以安全地删除观察者,或者只是让客户发出信号表明他们不再对观察者感兴趣。
重新思考编辑:好的,现在我想我明白你的问题是什么。我认为解决您问题的最佳方法是执行以下操作:
- 让每个存储的observer元素都有一个“有效”标志。当您处于通知循环中时,此标志将用于通知它。
- 您需要一个互斥锁来保护对该“有效”标志的访问。然后,取消订阅操作会锁定互斥锁以获取“有效”标志,并为所选观察者将其设置为false。
- 通知循环还必须锁定和解锁有效标志的互斥锁,并且仅对“有效”的观察者起作用。
鉴于取消订阅操作将阻止互斥锁重置有效标志(并且该特定Observer将不再在您的线程中使用),代码是线程安全的,并且客户端可以在取消订阅时立即删除任何观察者回。
1
2018-02-10 20:47
这样的事情会令人满意吗?虽然在通知时取消订阅观察者仍然是不安全的,因为你需要一个像你提到的界面(据我所知)。
Subscribe(Observer *x)
{
mutex.lock();
// add x to the list
mutex.unlock();
}
Unsubscribe(Observer *x)
{
mutex.lock();
while (!ok_to_delete)
cond.wait(mutex);
// remove x from list
mutex.unlock();
}
NotifyLoop()
{
while (true) {
// wait for something to trigger a notify
mutex.lock();
ok_to_delete = false;
// build a list of observers to notify
mutex.unlock();
// notify all observers from the list saved earlier
mutex.lock();
ok_to_delete = true;
cond.notify_all();
mutex.unlock();
}
}
如果你想在Notify()里面取消订阅() - (关于客户IMO的糟糕设计决定......) 您可以将通知程序线程的线程ID添加到数据结构中。在Unsubscribe函数中,您可以根据当前线程的id检查该线程ID(大多数线程库提供此功能 - 例如.pthread_self)。如果它们相同,则无需等待条件变量即可继续。
注意:如果客户端负责删除观察者,这意味着您遇到Notify回调内部的情况,您将取消订阅并删除观察者,但仍在执行带有该指针的垃圾。这是客户必须知道的事情,并且只在Notify()的末尾删除它。
1
2018-02-10 20:50
我认为如果不是非常优雅的话,这会成功:
class Subject {
public:
Subject() : t(bind(&Subject::Run, this)),m_key(0) { }
void Subscribe(Observer* o) {
mutex::scoped_lock l(m);
InternalObserver io( o );
boost::shared_ptr<InternalObserver> sp(&io);
observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp));
}
void Unsubscribe(Observer* o) {
mutex::scoped_lock l(m);
observers.find( MakeKey(o) )->second->exists = false; }
void WaitForSomethingInterestingToHappen() {}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
for( unsigned int i = 0; i < observers.size(); ++ i )
{
mutex::scoped_lock l(m);
if( observers[i]->exists )
{
mem_fun(&Observer::Notify);//needs changing
}
else
{
observers.erase(i);
--i;
}
}
}
}
private:
int MakeKey(Observer* o) {
return ++m_key;//needs changeing, sha of the object?
}
class InternalObserver {
public:
InternalObserver(Observer* o) : m_o( o ), exists( true ) {}
Observer* m_o;
bool exists;
};
map< int, boost::shared_ptr<InternalObserver> > observers;
thread t;
mutex m;
int m_key;
};
0
2018-02-11 16:34
更改 observers
到了 map
用钥匙 Observer*
并重视一个包装 Observer
。包装包括一个 volatile
boolean表示是否 Observer
已验证。在 subscribe
方法,在中创建包装器对象 有效 州。在 unsubscribe
方法,包装器标记为 无效。 Notify
被召唤 包装纸 而不是 实际的观察者。包装器将调用 Notify
在...上 实际的观察者 如果它是有效的(仍然订阅)
#include <map>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
using namespace std;
using namespace boost;
class Observer
{
public:
void Notify() {}
};
class ObserverWrapper : public Observer
{
public:
Observer* wrappee;
volatile bool valid;
ObserverWrapper(Observer* o)
{
wrappee = o;
valid = true;
}
void Notify()
{
if (valid) wrappee->Notify();
}
}
class Subject
{
public:
Subject() : t(bind(&Subject::Run, this))
{
}
void Subscribe(Observer* o)
{
mutex::scoped_lock l(m);
boost::shared_ptr<ObserverWrapper> sptr(new ObserverWrapper(o));
observers.insert(pair<Observer*, sptr));
}
void Unsubscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.find(o)->second->valid = false;
observers.erase(o);
}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
vector<ObserverWrapper*> notifyList;
{
mutex::scoped_lock l(m);
boost::copy(observers | boost::adaptors::map_values, std::back_inserter(notifyList));
}
// Should be no problem here
for_each(notifyList.begin(), notifyList.end(),
mem_fun(&ObserverWrapper::Notify));
}
}
private:
map<Observer*, ObserverWrapper*> observers;
thread t;
mutex m;
};
0
2018-04-11 05:20