问题 使用Rx处理可观察序列中的错误


如果发生错误,有没有办法让一个可观察的序列继续执行序列中的下一个元素? 从 这个帖子  看起来你需要在Catch()中指定一个新的可观察序列来恢复执行,但是如果你需要继续处理序列中的下一个元素呢?有没有办法实现这个目标?

更新: 方案如下: 我有一堆我需要处理的元素。处理由一系列步骤组成。我有 将这些步骤分解为我想要编写的任务。 我遵循了ToObservable()发布的指南 这里 按任务转换为可观察的构图。 所以我基本上都是这样做的 -

foreach(element in collection)
{
   var result = from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;
   result.subscribe( register on next and error handlers here)
 }

或者我可以这样:

var result = 
        from element in collection.ToObservable() 
        from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;

这里继续处理其他元素的最佳方法是什么,即使让我们说处理 其中一个元素抛出异常。我希望能够记录错误并理想地继续前进。


7903
2018-05-19 01:19


起源



答案:


詹姆斯和理查德都提出了一些好处,但我认为他们没有给你最好的方法来解决你的问题。

詹姆斯建议使用 .Catch(Observable.Never<Unit>())。当他说“将......允许流继续”时他错了,因为一旦你遇到异常,流就必须结束 - 这就是理查德在提到观察者与观察者之间的契约时所指出的。

另外,使用 Never 这样会导致你的观察者永远不会完成。

简短的回答是 .Catch(Observable.Empty<Unit>()) 是将序列从以错误结束的序列更改为以完成结束的序列的正确方法。

你已经找到了使用正确的想法 SelectMany 处理源集合的每个值,以便您可以捕获每个异常,但是您会遇到一些问题。

您正在使用任务(TPL)将函数调用转换为可观察对象。这会强制您的observable使用任务池线程,这意味着 SelectMany 声明可能会以非确定性顺序产生值。

您还隐藏了实际调用以处理数据,从而使重构和维护变得更加困难。

我认为你最好创建一个允许跳过异常的扩展方法。这里是:

public static IObservable<R> SelectAndSkipOnException<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return
        source
            .Select(t =>
                Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
            .Merge();
}

使用此方法,您现在可以简单地执行此操作:

var result =
    collection.ToObservable()
        .SelectAndSkipOnException(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        });

此代码更简单,但它隐藏了异常。如果你想在继续你的序列的同时坚持异常,那么你需要做一些额外的乐趣。添加几个重载 Materialize 扩展方法用于保持错误。

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
}

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<Notification<T>> source, Func<T, R> selector)
{
    Func<Notification<T>, Notification<R>> f = nt =>
    {
        if (nt.Kind == NotificationKind.OnNext)
        {
            try
            {
                return Notification.CreateOnNext<R>(selector(nt.Value));
            }
            catch (Exception ex)
            {
                ex.Data["Value"] = nt.Value;
                ex.Data["Selector"] = selector;
                return Notification.CreateOnError<R>(ex);
            }
        }
        else
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                return Notification.CreateOnError<R>(nt.Exception);
            }
            else
            {
                return Notification.CreateOnCompleted<R>();
            }
        }
    };
    return source.Select(nt => f(nt));
}

这些方法允许你写这个:

var result =
    collection
        .ToObservable()
        .Materialize(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        })
        .Do(nt =>
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                /* Process the error in `nt.Exception` */
            }
        })
        .Where(nt => nt.Kind != NotificationKind.OnError)
        .Dematerialize();

你甚至可以链接这些 Materialize 方法和用途 ex.Data["Value"] & ex.Data["Selector"] 获取抛出错误的值和选择器函数。

我希望这有帮助。


12
2017-07-26 04:52



在尝试使用可观察的可观察量时,我遇到了类似的问题。当内部observable抛出OnError时,outter observable查看它也会移动到OnError - 导致一切都关闭。我已经尝试过捕获异常并抛出OnCompleted的解决方案,但这会产生与OnCompleted和OnError完全相同的行为,这两种情况都会导致订阅关闭 - letstango


答案:


詹姆斯和理查德都提出了一些好处,但我认为他们没有给你最好的方法来解决你的问题。

詹姆斯建议使用 .Catch(Observable.Never<Unit>())。当他说“将......允许流继续”时他错了,因为一旦你遇到异常,流就必须结束 - 这就是理查德在提到观察者与观察者之间的契约时所指出的。

另外,使用 Never 这样会导致你的观察者永远不会完成。

简短的回答是 .Catch(Observable.Empty<Unit>()) 是将序列从以错误结束的序列更改为以完成结束的序列的正确方法。

你已经找到了使用正确的想法 SelectMany 处理源集合的每个值,以便您可以捕获每个异常,但是您会遇到一些问题。

您正在使用任务(TPL)将函数调用转换为可观察对象。这会强制您的observable使用任务池线程,这意味着 SelectMany 声明可能会以非确定性顺序产生值。

您还隐藏了实际调用以处理数据,从而使重构和维护变得更加困难。

我认为你最好创建一个允许跳过异常的扩展方法。这里是:

public static IObservable<R> SelectAndSkipOnException<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return
        source
            .Select(t =>
                Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
            .Merge();
}

使用此方法,您现在可以简单地执行此操作:

var result =
    collection.ToObservable()
        .SelectAndSkipOnException(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        });

此代码更简单,但它隐藏了异常。如果你想在继续你的序列的同时坚持异常,那么你需要做一些额外的乐趣。添加几个重载 Materialize 扩展方法用于保持错误。

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
}

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<Notification<T>> source, Func<T, R> selector)
{
    Func<Notification<T>, Notification<R>> f = nt =>
    {
        if (nt.Kind == NotificationKind.OnNext)
        {
            try
            {
                return Notification.CreateOnNext<R>(selector(nt.Value));
            }
            catch (Exception ex)
            {
                ex.Data["Value"] = nt.Value;
                ex.Data["Selector"] = selector;
                return Notification.CreateOnError<R>(ex);
            }
        }
        else
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                return Notification.CreateOnError<R>(nt.Exception);
            }
            else
            {
                return Notification.CreateOnCompleted<R>();
            }
        }
    };
    return source.Select(nt => f(nt));
}

这些方法允许你写这个:

var result =
    collection
        .ToObservable()
        .Materialize(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        })
        .Do(nt =>
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                /* Process the error in `nt.Exception` */
            }
        })
        .Where(nt => nt.Kind != NotificationKind.OnError)
        .Dematerialize();

你甚至可以链接这些 Materialize 方法和用途 ex.Data["Value"] & ex.Data["Selector"] 获取抛出错误的值和选择器函数。

我希望这有帮助。


12
2017-07-26 04:52



在尝试使用可观察的可观察量时,我遇到了类似的问题。当内部observable抛出OnError时,outter observable查看它也会移动到OnError - 导致一切都关闭。我已经尝试过捕获异常并抛出OnCompleted的解决方案,但这会产生与OnCompleted和OnError完全相同的行为,这两种情况都会导致订阅关闭 - letstango


之间的合同 IObservable 和 IObserver 是 OnNext*(OnCompelted|OnError)? 所有运营商都支持,即使不是来源。

您唯一的选择是使用重新订阅源 Retry,但如果源返回 IObservable   对于每个描述,您都不会看到任何新值。

您能提供有关您的方案的更多信息吗?也许还有另一种方式来看待它。

编辑: 根据您的最新反馈,听起来您只需要 Catch

var result = 
    from element in collection.ToObservable() 
    from aResult in DoAAsync(element).ToObservable().Log().Catch(Observable.Empty<TA>())
    from bResult in DoBAsync(aResult).ToObservable().Log().Catch(Observable.Empty<TB>()) 
    from cResult in DoCAsync(bResult).ToObservable().Log().Catch(Observable.Empty<TC>())
    select cResult;

这用一个替换错误 Empty 这不会触发下一个序列(因为它使用 SelectMany 在引擎盖下。


1
2018-05-19 07:39



我已经更新了帖子以包含我想要完成的场景 - Abhijeet Patel