问题 可观察,仅在完成时重试错误和缓存


我们可以使用cache()运算符来避免多次执行长任务(http请求),并重用其结果:

Observable apiCall = createApiCallObservable().cache(); // notice the .cache()

---------------------------------------------
// the first time we need it
apiCall.andSomeOtherStuff()
               .subscribe(subscriberA);

---------------------------------------------
//in the future when we need it again
apiCall.andSomeDifferentStuff()
               .subscribe(subscriberB);

第一次,http请求被执行,但第二次,因为我们使用了 缓存() 运算符,请求将不会被执行,但我们将能够重用第一个结果。

第一个请求成功完成时,此工作正常。但是如果在第一次尝试中调用了onError,那么下次新订阅者订阅同一个observable时,将再次调用onError而不再尝试http请求。

我们要做的是,如果第一次调用onError,那么下次有人订阅同一个observable时,将从头开始尝试http请求。即,observable只会缓存成功的api调用,即调用onCompleted的调用。

关于如何进行的任何想法?我们尝试使用retry()和cache()运算符没有太多运气。


3702
2017-10-18 08:13


起源



答案:


好吧,对于任何仍然感兴趣的人,我认为我有一个更好的方法来实现它与rx。

关键注意事项是使用onErrorResumeNext,它可以让你在出错时替换Observable。 所以看起来应该是这样的:

Observable<Object> apiCall = createApiCallObservable().cache(1);
//future call
apiCall.onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() {
    public Observable<? extends Object> call(Throwable throwable) {
        return  createApiCallObservable();
        }
    });

这样,如果第一个呼叫失败,未来的呼叫将只召回它(仅一次)。

但是每个尝试使用第一个observable的调用者都会失败并提出不同的请求。

你引用了原始的observable,让我们更新它。

所以,一个懒惰的吸气鬼:

Observable<Object> apiCall;
private Observable<Object> getCachedApiCall() {
    if ( apiCall == null){
        apiCall = createApiCallObservable().cache(1);
    }
    return apiCall;
}

现在,如果前一个失败,将重试的getter:

private Observable<Object> getRetryableCachedApiCall() {
    return getCachedApiCall().onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() {
        public Observable<? extends Object> call(Throwable throwable) {
            apiCall = null;
            return getCachedApiCall();
        }
    });
}

请注意,它每次调用时只会重试一次。

所以现在你的代码看起来像这样:

---------------------------------------------
// the first time we need it - this will be without a retry if you want..
getCachedApiCall().andSomeOtherStuff()
               .subscribe(subscriberA);

---------------------------------------------
//in the future when we need it again - for any other call so we will have a retry
getRetryableCachedApiCall().andSomeDifferentStuff()
               .subscribe(subscriberB);

7
2018-04-17 09:55





这是我们在扩展akarnokd的解决方案后最终得到的解决方案:

public class OnErrorRetryCache<T> {

    public static <T> Observable<T> from(Observable<T> source) {
         return new OnErrorRetryCache<>(source).deferred;
    }

    private final Observable<T> deferred;
    private final Semaphore singlePermit = new Semaphore(1);

    private Observable<T> cache = null;
    private Observable<T> inProgress = null;

    private OnErrorRetryCache(Observable<T> source) {
        deferred = Observable.defer(() -> createWhenObserverSubscribes(source));
    }

    private Observable<T> createWhenObserverSubscribes(Observable<T> source) 
    {
        singlePermit.acquireUninterruptibly();

        Observable<T> cached = cache;
        if (cached != null) {
            singlePermit.release();
            return cached;
        }

        inProgress = source
                .doOnCompleted(this::onSuccess)
                .doOnTerminate(this::onTermination)
                .replay()
                .autoConnect();

        return inProgress;
    }

    private void onSuccess() {
        cache = inProgress;
    }

    private void onTermination() {
        inProgress = null;
        singlePermit.release();
    }
}

我们需要缓存来自Retrofit的http请求的结果。所以这是创建的,其中一个可观察的内容会记住一个项目。

如果观察者在执行http请求时订阅,我们希望它等待并且不执行请求两次,除非正在进行的请求失败。为此,信号量允许单个访问创建或返回高速缓存的observable的块,如果创建了新的observable,我们将等待,直到该终止。可以找到上述测试 这里


5
2017-10-18 23:24





你必须做一些状态处理。我是这样做的:

public class CachedRetry {

    public static final class OnErrorRetryCache<T> {
        final AtomicReference<Observable<T>> cached = 
                new AtomicReference<>();

        final Observable<T> result;

        public OnErrorRetryCache(Observable<T> source) {
            result = Observable.defer(() -> {
                for (;;) {
                    Observable<T> conn = cached.get();
                    if (conn != null) {
                        return conn;
                    }
                    Observable<T> next = source
                            .doOnError(e -> cached.set(null))
                            .replay()
                            .autoConnect();

                    if (cached.compareAndSet(null, next)) {
                        return next;
                    }
                }
            });
        }

        public Observable<T> get() {
            return result;
        }
    }

    public static void main(String[] args) {
        AtomicInteger calls = new AtomicInteger();
        Observable<Integer> source = Observable
                .just(1)
                .doOnSubscribe(() -> 
                    System.out.println("Subscriptions: " + (1 + calls.get())))
                .flatMap(v -> {
                    if (calls.getAndIncrement() == 0) {
                        return Observable.error(new RuntimeException());
                    }
                    return Observable.just(42);
                });

        Observable<Integer> o = new OnErrorRetryCache<>(source).get();

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));
    }
}

它通过缓存一个完全成功的源并将其返回给每个人来工作。否则,(部分)失败的源将破坏缓存,下一个调用观察器将触发重新订阅。


4
2017-10-18 08:35



谢谢akarnokd,这看起来不错。当源是长时间运行的http请求(几秒)时,我只会遇到一些问题,而第二个,第三个订阅者订阅时,第一个订阅者仍在进行中。在这种情况下,它们都会失败,并且不会多次尝试该请求。 cache()运算符的行为不同。我会更多地看一下,并尝试使用你的例子复制我提到的问题,并尽快回复你。 - Plato


答案:


好吧,对于任何仍然感兴趣的人,我认为我有一个更好的方法来实现它与rx。

关键注意事项是使用onErrorResumeNext,它可以让你在出错时替换Observable。 所以看起来应该是这样的:

Observable<Object> apiCall = createApiCallObservable().cache(1);
//future call
apiCall.onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() {
    public Observable<? extends Object> call(Throwable throwable) {
        return  createApiCallObservable();
        }
    });

这样,如果第一个呼叫失败,未来的呼叫将只召回它(仅一次)。

但是每个尝试使用第一个observable的调用者都会失败并提出不同的请求。

你引用了原始的observable,让我们更新它。

所以,一个懒惰的吸气鬼:

Observable<Object> apiCall;
private Observable<Object> getCachedApiCall() {
    if ( apiCall == null){
        apiCall = createApiCallObservable().cache(1);
    }
    return apiCall;
}

现在,如果前一个失败,将重试的getter:

private Observable<Object> getRetryableCachedApiCall() {
    return getCachedApiCall().onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() {
        public Observable<? extends Object> call(Throwable throwable) {
            apiCall = null;
            return getCachedApiCall();
        }
    });
}

请注意,它每次调用时只会重试一次。

所以现在你的代码看起来像这样:

---------------------------------------------
// the first time we need it - this will be without a retry if you want..
getCachedApiCall().andSomeOtherStuff()
               .subscribe(subscriberA);

---------------------------------------------
//in the future when we need it again - for any other call so we will have a retry
getRetryableCachedApiCall().andSomeDifferentStuff()
               .subscribe(subscriberB);

7
2018-04-17 09:55





这是我们在扩展akarnokd的解决方案后最终得到的解决方案:

public class OnErrorRetryCache<T> {

    public static <T> Observable<T> from(Observable<T> source) {
         return new OnErrorRetryCache<>(source).deferred;
    }

    private final Observable<T> deferred;
    private final Semaphore singlePermit = new Semaphore(1);

    private Observable<T> cache = null;
    private Observable<T> inProgress = null;

    private OnErrorRetryCache(Observable<T> source) {
        deferred = Observable.defer(() -> createWhenObserverSubscribes(source));
    }

    private Observable<T> createWhenObserverSubscribes(Observable<T> source) 
    {
        singlePermit.acquireUninterruptibly();

        Observable<T> cached = cache;
        if (cached != null) {
            singlePermit.release();
            return cached;
        }

        inProgress = source
                .doOnCompleted(this::onSuccess)
                .doOnTerminate(this::onTermination)
                .replay()
                .autoConnect();

        return inProgress;
    }

    private void onSuccess() {
        cache = inProgress;
    }

    private void onTermination() {
        inProgress = null;
        singlePermit.release();
    }
}

我们需要缓存来自Retrofit的http请求的结果。所以这是创建的,其中一个可观察的内容会记住一个项目。

如果观察者在执行http请求时订阅,我们希望它等待并且不执行请求两次,除非正在进行的请求失败。为此,信号量允许单个访问创建或返回高速缓存的observable的块,如果创建了新的observable,我们将等待,直到该终止。可以找到上述测试 这里


5
2017-10-18 23:24





你必须做一些状态处理。我是这样做的:

public class CachedRetry {

    public static final class OnErrorRetryCache<T> {
        final AtomicReference<Observable<T>> cached = 
                new AtomicReference<>();

        final Observable<T> result;

        public OnErrorRetryCache(Observable<T> source) {
            result = Observable.defer(() -> {
                for (;;) {
                    Observable<T> conn = cached.get();
                    if (conn != null) {
                        return conn;
                    }
                    Observable<T> next = source
                            .doOnError(e -> cached.set(null))
                            .replay()
                            .autoConnect();

                    if (cached.compareAndSet(null, next)) {
                        return next;
                    }
                }
            });
        }

        public Observable<T> get() {
            return result;
        }
    }

    public static void main(String[] args) {
        AtomicInteger calls = new AtomicInteger();
        Observable<Integer> source = Observable
                .just(1)
                .doOnSubscribe(() -> 
                    System.out.println("Subscriptions: " + (1 + calls.get())))
                .flatMap(v -> {
                    if (calls.getAndIncrement() == 0) {
                        return Observable.error(new RuntimeException());
                    }
                    return Observable.just(42);
                });

        Observable<Integer> o = new OnErrorRetryCache<>(source).get();

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));
    }
}

它通过缓存一个完全成功的源并将其返回给每个人来工作。否则,(部分)失败的源将破坏缓存,下一个调用观察器将触发重新订阅。


4
2017-10-18 08:35



谢谢akarnokd,这看起来不错。当源是长时间运行的http请求(几秒)时,我只会遇到一些问题,而第二个,第三个订阅者订阅时,第一个订阅者仍在进行中。在这种情况下,它们都会失败,并且不会多次尝试该请求。 cache()运算符的行为不同。我会更多地看一下,并尝试使用你的例子复制我提到的问题,并尽快回复你。 - Plato


您是否考虑过使用AsyncSubject为网络请求实现缓存?我做了一个示例应用程序 RxApp 测试它是如何工作的。我使用单例模型来获取网络响应。这使得缓存响应,从多个片段访问数据,订阅待处理请求以及为自动UI测试提供模拟数据成为可能。


0
2017-11-02 19:27



如果我们使用AsyncSubject,并且第一个http请求失败,所有将来的订阅者都会收到错误通知,而不是再次尝试http调用,对吧? - Plato
这是正确的,但模型应提供例如通过向用户显示错误,UI可以在处理错误后调用的reset()方法。 - pmellaaho