我遇到的一个用例,我怀疑我不能成为唯一一个用例,如下所示:
IObservable<T> Observable.RepeatLastValueDuringSilence(this IObservable<T> inner, TimeSpan maxQuietPeriod);
它将返回内部observable中的所有未来项,但是,如果内部observable在一段时间内没有调用OnNext(maxQuietPeriod),它只重复最后一个值(当然内部调用OnCompleted或OnError) 。
理由是服务定期ping出定期状态更新。例如:
var myStatus = Observable.FromEvent(
h=>this.StatusUpdate+=h,
h=>this.StatusUpdate-=h);
var messageBusStatusPinger = myStatus
.RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1))
.Subscribe(update => _messageBus.Send(update));
这样的事情存在吗?还是我过度估计它的用处?
谢谢,
亚历克斯
PS:我为任何不正确的术语/语法道歉,因为我只是第一次探索Rx。
类似于Matthew的解决方案,但是这里定时器在源中接收到每个元素之后开始,我认为这更正确(但差异不太重要):
public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{
return inner.Select(x =>
Observable.Interval(maxQuietPeriod)
.Select(_ => x)
.StartWith(x)
).Switch();
}
而且测试:
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1")
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2"))
.Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3"));
source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);
你应该看到 1
打印10次(5次来源,5次沉默重复),然后很多 2
当你从源头获得一个,从每个之间的沉默中获得4个,然后是无限的 3
。
这个相当简单的查询可以完成工作:
var query =
source
.Select(s =>
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.StartWith(s)
.Select(x => s))
.Switch();
永远不要低估它的力量 .Switch()
。
我认为这可以做你想要的,如果你的观察不热,你需要 Publish
和 Refcount
它:
public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{
var throttled = inner.Throttle(maxQuietPeriod);
var repeating = throttled.SelectMany(i =>
Observable
.Interval(maxQuietPeriod)
.Select(_ => i)
.TakeUntil(inner));
return Observable.Merge(inner, throttled, repeating);
}
Rx库中没有方法,但我也需要这样的方法。在我的用例中,即使源没有输出任何值,我也需要输出值。如果您不希望在第一个源值出现之前输出任何值,则可以删除 defaultValue
参数和调用 createTimer()
在订阅呼叫之前。
需要调度程序来运行计时器。一个明显的重载是不需要调度程序并选择默认调度程序(我使用ThreadPool调度程序)。
Imports System.Reactive
Imports System.Reactive.Concurrency
Imports System.Reactive.Disposables
Imports System.Reactive.Linq
<Extension()>
Public Function AtLeastEvery(Of T)(source As IObservable(Of T),
timeout As TimeSpan,
defaultValue As T,
scheduler As IScheduler
) As IObservable(Of T)
If source Is Nothing Then Throw New ArgumentNullException("source")
If scheduler Is Nothing Then Throw New ArgumentNullException("scheduler")
Return Observable.Create(
Function(observer As IObserver(Of T))
Dim id As ULong
Dim gate As New Object()
Dim timer As New SerialDisposable()
Dim lastValue As T = defaultValue
Dim createTimer As Action =
Sub()
Dim startId As ULong = id
timer.Disposable = scheduler.Schedule(timeout,
Sub(self As Action(Of TimeSpan))
Dim noChange As Boolean
SyncLock gate
noChange = (id = startId)
If noChange Then
observer.OnNext(lastValue)
End If
End SyncLock
'only restart if no change, otherwise
'the change restarted the timeout
If noChange Then self(timeout)
End Sub)
End Sub
'start the first timeout
createTimer()
'subscribe to the source observable
Dim subscription = source.Subscribe(
Sub(v)
SyncLock gate
id += 1UL
lastValue = v
End SyncLock
observer.OnNext(v)
createTimer() 'reset the timeout
End Sub,
Sub(ex)
SyncLock gate
id += 1UL
End SyncLock
observer.OnError(ex)
'do not reset the timeout, because the sequence has ended
End Sub,
Sub()
SyncLock gate
id += 1UL
End SyncLock
observer.OnCompleted()
'do not reset the timeout, because the sequence has ended
End Sub)
Return New CompositeDisposable(timer, subscription)
End Function)
End Function