问题 处理RxJava中的API异常


我目前正试图绕过RxJava,但我在处理服务调用异常方面遇到了一些麻烦。

基本上,我有一个(Retrofit)服务返回一个 Observable<ServiceResponse>ServiceResponse 定义如下:

public class ServiceResponse {
    private int status;
    private String message;
    private JsonElement data;

    public JsonElement getData() {
        return data;
    }

    public int getStatus() {
        return status;
    }

    public String getMessage() {
        return message;
    }
}

现在我想要的是将该通用响应映射到a List<Account> 包含在数据JsonElement字段中(我假设你不在乎什么 Account 对象看起来像,所以我不会用它污染帖子)。以下代码适用于成功案例,但我找不到处理API异常的好方法:

service.getAccounts()
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .map(new Func1<ServiceResponse, AccountData>() {
               @Override
               public AccountData call(ServiceResponse serviceResponse) {

                   // TODO: ick. fix this. there must be a better way...
                   ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus());
                   switch (responseType) {
                       case SUCCESS:
                           Gson gson = new GsonBuilder().create();
                           return gson.fromJson(serviceResponse.getData(), AccountData.class);
                       case HOST_UNAVAILABLE:
                           throw new HostUnavailableException(serviceResponse.getMessage());
                       case SUSPENDED_USER:
                           throw new SuspendedUserException(serviceResponse.getMessage());
                       case SYSTEM_ERROR:
                       case UNKNOWN:
                       default:
                           throw new SystemErrorException(serviceResponse.getMessage());
                   }
              }
        })
        .map(new Func1<AccountData, List<Account>>() {
                @Override
                public List<Account> call(AccountData accountData) {
                    Gson gson = new GsonBuilder().create();
                    List<Account> res = new ArrayList<Account>();
                    for (JsonElement account : accountData.getAccounts()) {
                        res.add(gson.fromJson(account, Account.class));
                    }
                    return res;
                }
        })
        .subscribe(accountsRequest);

有一个更好的方法吗?这个  工作,onError会激发给我的观察者,我会收到我扔的错误,但看起来我似乎并没有这样做。

提前致谢!

编辑:

让我澄清一下我想要实现的目标:

我希望有一个可以从UI调用的类(例如,Activity或Fragment,或其他)。该课程需要一个 Observer<List<Account>> 像这样的参数:

public Subscription loadAccounts(Observer<List<Account>> observer, boolean forceRefresh) {
    ...
}

当UI被分离/销毁/等时,该方法将返回可以取消订阅的订阅。

参数化观察者将处理onNext,以便在Accounts列表中传递成功的响应。 OnError会处理任何异常,但也会传递任何API异常(例如,如果响应状态!= 200,我们将创建一个Throwable并将其传递给onError)。理想情况下,我不想只是“抛出”异常,我想将它直接传递给Observer。这就是我看到的所有例子。

复杂的是我的Retrofit服务返回了一个 ServiceResponse 对象,所以我的观察者不能订阅它。我提出的最好的方法是在Observer周围创建一个Observer包装器,如下所示:

@Singleton
public class AccountsDatabase {

    private AccountsService service;

    private List<Account> accountsCache = null;
    private PublishSubject<ServiceResponse> accountsRequest = null;

    @Inject
    public AccountsDatabase(AccountsService service) {
        this.service = service;
    }

    public Subscription loadAccounts(Observer<List<Account>> observer, boolean forceRefresh) {

        ObserverWrapper observerWrapper = new ObserverWrapper(observer);

        if (accountsCache != null) {
            // We have a cached value. Emit it immediately.
            observer.onNext(accountsCache);
        }

        if (accountsRequest != null) {
            // There's an in-flight network request for this section already. Join it.
            return accountsRequest.subscribe(observerWrapper);
        }

        if (accountsCache != null && !forceRefresh) {
            // We had a cached value and don't want to force a refresh on the data. Just
            // return an empty subscription
            observer.onCompleted();
            return Subscriptions.empty();
        }

        accountsRequest = PublishSubject.create();

        accountsRequest.subscribe(new ObserverWrapper(new EndObserver<List<Account>>() {

            @Override
            public void onNext(List<Account> accounts) {
                accountsCache = accounts;
            }

            @Override
            public void onEnd() {
                accountsRequest = null;
            }
        }));

        Subscription subscription = accountsRequest.subscribe(observerWrapper);

        service.getAccounts()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(accountsRequest);

        return subscription;
    }

    static class ObserverWrapper implements Observer<ServiceResponse> {

        private Observer<List<Account>> observer;

        public ObserverWrapper(Observer<List<Account>> observer) {
            this.observer = observer;
        }

        @Override
        public void onCompleted() {
            observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onNext(ServiceResponse serviceResponse) {
            ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus());
            switch (responseType) {
                case SUCCESS:
                    Gson gson = new GsonBuilder().create();
                    AccountData accountData = gson.fromJson(serviceResponse.getData(), AccountData.class);
                    List<Account> res = new ArrayList<>();
                    for (JsonElement account : accountData.getAccounts()) {
                        res.add(gson.fromJson(account, Account.class));
                    }
                    observer.onNext(res);
                    observer.onCompleted();
                    break;
                default:
                    observer.onError(new ApiException(serviceResponse.getMessage(), responseType));
                    break;
            }
        }
    }
}

我仍然觉得我没有正确使用这个。我以前没有见过其他人使用ObserverWrapper。也许我不应该使用RxJava,尽管SoundCloud和Netflix的人员在他们的演示文稿中真的把它卖给了我,我非常渴望学习它。


5214
2018-06-16 12:10


起源



答案:


请阅读下面我添加了一个编辑。

使用RxJava在Action / Func / Observer中抛出是完全正确的。该异常将由框架传播到您的Observer。 如果你只限于调用onError,那么你就会扭曲自己来实现这一点。

据说这是一个建议,就是简单地删除这个包装器并添加一个简单的验证 service.getAccount ... Observables链中的操作。

我将使用与地图链接的doOnNext(new ValidateServiceResponseOrThrow)(新的MapValidResponseToAccountList)。这些是简单的类,它们实现了必要的代码,以使Observable链更具可读性。

这是使用我建议简化的loadAccount方法。

public Subscription loadAccounts(Observer<List<Account>> observer, boolean forceRefresh) {
    if (accountsCache != null) {
        // We have a cached value. Emit it immediately.
        observer.onNext(accountsCache);
    }

    if (accountsRequest != null) {
        // There's an in-flight network request for this section already. Join it.
        return accountsRequest.subscribe(observer);
    }

    if (accountsCache != null && !forceRefresh) {
        // We had a cached value and don't want to force a refresh on the data. Just
        // return an empty subscription
        observer.onCompleted();
        return Subscriptions.empty();
    }

    accountsRequest = PublishSubject.create();
    accountsRequest.subscribe(new EndObserver<List<Account>>() {

        @Override
        public void onNext(List<Account> accounts) {
            accountsCache = accounts;
        }

        @Override
        public void onEnd() {
            accountsRequest = null;
        }
    });

    Subscription subscription = accountsRequest.subscribe(observer);

    service.getAccounts()
            .doOnNext(new ValidateServiceResponseOrThrow())
            .map(new MapValidResponseToAccountList())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(accountsRequest);

    return subscription;
}

private static class ValidateResponseOrThrow implements Action1<ServiceResponse> {
        @Override
        public void call(ServiceResponse response) {
            ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus());
            if (responseType != SUCCESS)
                throw new ApiException(serviceResponse.getMessage(), responseType));
        }
    }

private static class MapValidResponseToAccountList implements Func1<ServiceResponse, List<Account>> {
    @Override
    public Message call(ServiceResponse response) {
        // add code here to map the ServiceResponse into the List<Accounts> as you've provided already
    }
}

编辑: 除非有人另有说法,否则我认为最好使用flatMap返回错误。 我过去曾抛出过Exceptions from Action,但我不相信这是推荐的方式。

如果使用flatMap,则会有一个更清晰的Exception堆栈。如果你从一个Action内部抛出异常堆栈 实际上会包含 rx.exceptions.OnErrorThrowable$OnNextValue 不理想的例外情况。

让我用flatMap演示上面的例子。

private static class ValidateServiceResponse implements rx.functions.Func1<ServiceResponse, Observable<ServiceResponse>> {
    @Override
    public Observable<ServiceResponse> call(ServiceResponse response) {
        ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus());
        if (responseType != SUCCESS)
            return Observable.error(new ApiException(serviceResponse.getMessage(), responseType));
        return Observable.just(response);
    }
}

service.getAccounts()
    .flatMap(new ValidateServiceResponse())
    .map(new MapValidResponseToAccountList())
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(accountsRequest);

正如你所看到的,差异是微妙的。该 ValidateServiceResponse 现在实现了 Func1 代替 Action1 我们不再使用了 throw 关键词。我们用 Observable.error(new Throwable) 代替。我相信这与预期的Rx合约更合适。


13
2018-06-17 20:48



谢谢,这段代码肯定更具可读性:) - Bradley Campbell
它只是我,还是不可能从Action1调用中抛出异常。在你的例子中,你只是在调用new而不是throw。所以它不会被放在地板上并且onNext被调用而不是onError吗? - schwiz
@schwiz可以从Action内部抛出异常。为清楚起见,我添加了丢失的抛出语句。但话虽如此,我已经添加了一个编辑到我的帖子,因为我不再使用这种方法。我用flatMap。 - Miguel Lavigne
@MiguelLavigne我遇到的问题是异常必须扩展RuntimeException或者它不会编译。 - schwiz
当我尝试将ValidateServiceResponse()对象传递给flatmap时,我得到的是传递给flatmap方法的正确类型。“flatmap(Func1,Observable)无法应用于(ValidateserviceResponse()) - Brandon


答案:


请阅读下面我添加了一个编辑。

使用RxJava在Action / Func / Observer中抛出是完全正确的。该异常将由框架传播到您的Observer。 如果你只限于调用onError,那么你就会扭曲自己来实现这一点。

据说这是一个建议,就是简单地删除这个包装器并添加一个简单的验证 service.getAccount ... Observables链中的操作。

我将使用与地图链接的doOnNext(new ValidateServiceResponseOrThrow)(新的MapValidResponseToAccountList)。这些是简单的类,它们实现了必要的代码,以使Observable链更具可读性。

这是使用我建议简化的loadAccount方法。

public Subscription loadAccounts(Observer<List<Account>> observer, boolean forceRefresh) {
    if (accountsCache != null) {
        // We have a cached value. Emit it immediately.
        observer.onNext(accountsCache);
    }

    if (accountsRequest != null) {
        // There's an in-flight network request for this section already. Join it.
        return accountsRequest.subscribe(observer);
    }

    if (accountsCache != null && !forceRefresh) {
        // We had a cached value and don't want to force a refresh on the data. Just
        // return an empty subscription
        observer.onCompleted();
        return Subscriptions.empty();
    }

    accountsRequest = PublishSubject.create();
    accountsRequest.subscribe(new EndObserver<List<Account>>() {

        @Override
        public void onNext(List<Account> accounts) {
            accountsCache = accounts;
        }

        @Override
        public void onEnd() {
            accountsRequest = null;
        }
    });

    Subscription subscription = accountsRequest.subscribe(observer);

    service.getAccounts()
            .doOnNext(new ValidateServiceResponseOrThrow())
            .map(new MapValidResponseToAccountList())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(accountsRequest);

    return subscription;
}

private static class ValidateResponseOrThrow implements Action1<ServiceResponse> {
        @Override
        public void call(ServiceResponse response) {
            ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus());
            if (responseType != SUCCESS)
                throw new ApiException(serviceResponse.getMessage(), responseType));
        }
    }

private static class MapValidResponseToAccountList implements Func1<ServiceResponse, List<Account>> {
    @Override
    public Message call(ServiceResponse response) {
        // add code here to map the ServiceResponse into the List<Accounts> as you've provided already
    }
}

编辑: 除非有人另有说法,否则我认为最好使用flatMap返回错误。 我过去曾抛出过Exceptions from Action,但我不相信这是推荐的方式。

如果使用flatMap,则会有一个更清晰的Exception堆栈。如果你从一个Action内部抛出异常堆栈 实际上会包含 rx.exceptions.OnErrorThrowable$OnNextValue 不理想的例外情况。

让我用flatMap演示上面的例子。

private static class ValidateServiceResponse implements rx.functions.Func1<ServiceResponse, Observable<ServiceResponse>> {
    @Override
    public Observable<ServiceResponse> call(ServiceResponse response) {
        ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus());
        if (responseType != SUCCESS)
            return Observable.error(new ApiException(serviceResponse.getMessage(), responseType));
        return Observable.just(response);
    }
}

service.getAccounts()
    .flatMap(new ValidateServiceResponse())
    .map(new MapValidResponseToAccountList())
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(accountsRequest);

正如你所看到的,差异是微妙的。该 ValidateServiceResponse 现在实现了 Func1 代替 Action1 我们不再使用了 throw 关键词。我们用 Observable.error(new Throwable) 代替。我相信这与预期的Rx合约更合适。


13
2018-06-17 20:48



谢谢,这段代码肯定更具可读性:) - Bradley Campbell
它只是我,还是不可能从Action1调用中抛出异常。在你的例子中,你只是在调用new而不是throw。所以它不会被放在地板上并且onNext被调用而不是onError吗? - schwiz
@schwiz可以从Action内部抛出异常。为清楚起见,我添加了丢失的抛出语句。但话虽如此,我已经添加了一个编辑到我的帖子,因为我不再使用这种方法。我用flatMap。 - Miguel Lavigne
@MiguelLavigne我遇到的问题是异常必须扩展RuntimeException或者它不会编译。 - schwiz
当我尝试将ValidateServiceResponse()对象传递给flatmap时,我得到的是传递给flatmap方法的正确类型。“flatmap(Func1,Observable)无法应用于(ValidateserviceResponse()) - Brandon


你可以阅读这篇关于错误处理的好文章 http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/


0
2017-09-16 21:20