问题 当没有值传入时,是否有Rx方法定期重复上一个值?


我遇到的一个用例,我怀疑我不能成为唯一一个用例,如下所示:

IObservable<T> Observable.RepeatLastValueDuringSilence(this IObservable<T> inner, TimeSpan maxQuietPeriod);

它将返回内部observable中的所有未来项,但是,如果内部observable在一段时间内没有调用OnNext(maxQuietPeriod),它只重复最后一个值(当然内部调用OnCompleted或OnError) 。

理由是服务定期ping出定期状态更新。例如:

var myStatus = Observable.FromEvent(
    h=>this.StatusUpdate+=h,
    h=>this.StatusUpdate-=h);

var messageBusStatusPinger = myStatus
    .RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1))
    .Subscribe(update => _messageBus.Send(update));

这样的事情存在吗?还是我过度估计它的用处?

谢谢, 亚历克斯

PS:我为任何不正确的术语/语法道歉,因为我只是第一次探索Rx。


3512
2017-07-12 12:06


起源

它应该以什么频率重复? - Asti
@Asti:意图是这应该由maxQuietPeriod指定。如果内部序列没有为maxQuietPeriod生成值,则应生成重复值。 - AlexC


答案:


类似于Matthew的解决方案,但是这里定时器在源中接收到每个元素之后开始,我认为这更正确(但差异不太重要):

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{    
    return inner.Select(x => 
        Observable.Interval(maxQuietPeriod)
                  .Select(_ => x)
                  .StartWith(x)
    ).Switch();
}

而且测试:

var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1")
                       .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2"))
                       .Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3"));

source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);

你应该看到 1 打印10次(5次来源,5次沉默重复),然后很多 2 当你从源头获得一个,从每个之间的沉默中获得4个,然后是无限的 3


5
2017-07-13 05:18



这看起来像我追求的,谢谢。 - AlexC


这个相当简单的查询可以完成工作:

var query =
    source
        .Select(s =>
            Observable
                .Interval(TimeSpan.FromSeconds(1.0))
                .StartWith(s)
                .Select(x => s))
        .Switch();

永远不要低估它的力量 .Switch()


5
2017-07-13 09:41



尼斯。我删除了我的 TakeUntil 并改变了 Concat 至 Switch 也。 - yamen


我认为这可以做你想要的,如果你的观察不热,你需要 Publish 和 Refcount 它:

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{
    var throttled = inner.Throttle(maxQuietPeriod);
    var repeating = throttled.SelectMany(i => 
        Observable
            .Interval(maxQuietPeriod)
            .Select(_ => i)
            .TakeUntil(inner));
    return Observable.Merge(inner, throttled, repeating);
}

1
2017-07-13 00:55



应该注意,为了使这个工作,观察者必须是热的 - 也许在前面加上这个: var published = inner.Publish().RefCount(); - yamen
干杯,我已经编辑过了 - Matthew Finlay


Rx库中没有方法,但我也需要这样的方法。在我的用例中,即使源没有输出任何值,我也需要输出值。如果您不希望在第一个源值出现之前输出任何值,则可以删除 defaultValue 参数和调用 createTimer() 在订阅呼叫之前。

需要调度程序来运行计时器。一个明显的重载是不需要调度程序并选择默认调度程序(我使用ThreadPool调度程序)。

Imports System.Reactive
Imports System.Reactive.Concurrency
Imports System.Reactive.Disposables
Imports System.Reactive.Linq

<Extension()>
Public Function AtLeastEvery(Of T)(source As IObservable(Of T), 
                                   timeout As TimeSpan, 
                                   defaultValue As T, 
                                   scheduler As IScheduler
                                  ) As IObservable(Of T)
    If source Is Nothing Then Throw New ArgumentNullException("source")
    If scheduler Is Nothing Then Throw New ArgumentNullException("scheduler")
    Return Observable.Create(
        Function(observer As IObserver(Of T))
            Dim id As ULong
            Dim gate As New Object()
            Dim timer As New SerialDisposable()
            Dim lastValue As T = defaultValue

            Dim createTimer As Action =
                Sub()
                    Dim startId As ULong = id
                    timer.Disposable = scheduler.Schedule(timeout,
                                           Sub(self As Action(Of TimeSpan))
                                               Dim noChange As Boolean
                                               SyncLock gate
                                                   noChange = (id = startId)
                                                   If noChange Then
                                                       observer.OnNext(lastValue)
                                                   End If
                                               End SyncLock
                                               'only restart if no change, otherwise
                                               'the change restarted the timeout
                                               If noChange Then self(timeout)
                                           End Sub)
                End Sub
            'start the first timeout
            createTimer()
            'subscribe to the source observable
            Dim subscription = source.Subscribe(
                Sub(v)
                    SyncLock gate
                        id += 1UL
                        lastValue = v
                    End SyncLock
                    observer.OnNext(v)
                    createTimer() 'reset the timeout
                End Sub,
                Sub(ex)
                    SyncLock gate
                        id += 1UL
                    End SyncLock
                    observer.OnError(ex)
                    'do not reset the timeout, because the sequence has ended
                End Sub,
                Sub()
                    SyncLock gate
                        id += 1UL
                    End SyncLock
                    observer.OnCompleted()
                    'do not reset the timeout, because the sequence has ended
                End Sub)

            Return New CompositeDisposable(timer, subscription)
        End Function)
End Function

-1
2017-07-12 17:32