问题 RxJava组合请求序列
问题
我有两个Apis。 Api 1给了我一个项目列表,Api 2给了我更多关于我从Api获得的每个项目的详细信息。到目前为止,我解决它的方式导致了糟糕的性能。
问题
在Retrofit和RxJava的帮助下,快速,快速地解决了这个问题。
我的方法
在片刻我的解决方案看起来像这样:
第1步:执行改造 Single<ArrayList<Information>>
来自Api 1。
第2步:我遍历这些项目并向Api 2发出请求。
第3步:改造返回按顺序执行 Single<ExtendedInformation>
对于
每一个项目
步骤4:在完成Api 2的所有调用完成后,我为包含信息和扩展信息的所有项创建一个新对象。
我的代码
public void addExtendedInformations(final Information[] informations) {
final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
@Override
public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
if (informationDetailArrayList.size() >= informations.length){
listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
}
}
};
for (Information information : informations) {
getExtendedInformation(ratingRequestListener, information);
}
}
public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
Single<ExtendedInformation> repos = service.findForTitle(information.title);
disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
@Override
public void onSuccess(ExtendedInformation extendedInformation) {
ratingRequestListener.onDownloadFinished(information, extendedInformation);
}
@Override
public void onError(Throwable e) {
ExtendedInformation extendedInformation = new ExtendedInformation();
ratingRequestListener.onDownloadFinished(extendedInformation, information);
}
}));
}
public interface RatingRequestListener {
void onDownloadFinished(Information information, ExtendedInformation extendedInformation);
}
6696
2017-12-04 20:39
起源
答案:
TL;博士 使用 concatMapEager
要么 flatMap
并异步或在调度程序上执行子调用。
很长的故事
我不是Android开发人员,所以我的问题将仅限于纯RxJava(版本1和版本2)。
如果我得到正确的图片,所需的流程是:
some query param
\--> Execute query on API_1 -> list of items
|-> Execute query for item 1 on API_2 -> extended info of item1
|-> Execute query for item 2 on API_2 -> extended info of item1
|-> Execute query for item 3 on API_2 -> extended info of item1
...
\-> Execute query for item n on API_2 -> extended info of item1
\----------------------------------------------------------------------/
|
\--> stream (or list) of extended item info for the query param
假设Retrofit为客户生成了
interface Api1 {
@GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}
interface Api2 {
@GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}
如果项目的顺序不重要,则可以使用 flatMap
只要:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()))
.subscribe(...)
但 除非 改造生成器配置有
使用异步适配器(调用将在okhttp内部执行程序中排队)。我个人认为这不是一个好主意,因为你无法控制这个执行者。
.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
或者使用基于调度程序的适配器(将在RxJava调度程序上调度调用)。这是我的首选方案,因为您明确选择使用哪个调度程序,它很可能是IO调度程序,但您可以自由尝试不同的调度程序。
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
原因是 flatMap
将订阅由创建的每个observable api2.extendedInfo(...)
并将它们合并到生成的observable中。因此,结果将按接收顺序显示。
如果 改造客户 不是 设置为异步或设置为在调度程序上运行,可以设置一个:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
这个结构几乎与前面提到的一个execpts相同 本地 每个调度程序 api2.extendedInfo
应该运行。
可以调整 maxConcurrency
的参数 flatMap
控制您想要同时执行的请求数。虽然我对此要谨慎,但您不希望同时运行所有查询。通常是默认值 maxConcurrency
够好了(128
)。
现在,如果原始查询的顺序很重要。 concatMap
通常是运营商做同样的事情 flatMap
顺序但顺序,如果代码需要等待执行所有子查询,则结果很慢。然而,解决方案又向前迈进了一步 concatMapEager
,这个将按顺序订阅observable,并根据需要缓冲结果。
假设改造客户端是异步的或在特定的调度程序上运行:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()))
.subscribe(...)
或者如果必须在本地设置调度程序:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
也可以在此运算符中调整并发性。
此外,如果Api正在返回 Flowable
,有可能使用 .parallel
目前在RxJava 2.1.7中仍处于测试阶段。但是后来结果不合适而且我不知道一种方式(但是?)订购它们而不进行排序。
api.items(queryParam) // Flowable<Item>
.parallel(10)
.runOn(Schedulers.io())
.map(item -> api2.extendedInfo(item.id()))
.sequential(); // Flowable<ItemExtended>
12
2017-12-06 14:05
该 flatMap
运营商旨在满足这些类型的工作流程。
我将通过一个简单的五步示例来概述广泛的笔画。希望您可以轻松地在代码中重建相同的原则:
@Test fun flatMapExample() {
// (1) constructing a fake stream that emits a list of values
Observable.just(listOf(1, 2, 3, 4, 5))
// (2) convert our List emission into a stream of its constituent values
.flatMap { numbers -> Observable.fromIterable(numbers) }
// (3) subsequently convert each individual value emission into an Observable of some
// newly calculated type
.flatMap { number ->
when(number) {
1 -> Observable.just("A1")
2 -> Observable.just("B2")
3 -> Observable.just("C3")
4 -> Observable.just("D4")
5 -> Observable.just("E5")
else -> throw RuntimeException("Unexpected value for number [$number]")
}
}
// (4) collect all the final emissions into a list
.toList()
.subscribeBy(
onSuccess = {
// (5) handle all the combined results (in list form) here
println("## onNext($it)")
},
onError = { error ->
println("## onError(${error.message})")
}
)
}
(顺便说一下,如果排放的顺序很重要,请看看使用情况 concatMap
代替)。
我希望有所帮助。
1
2017-12-04 21:05
检查下面它是否正常工作。
假设你有多个网络调用,你需要make-cals来获取Github用户信息和Github用户事件。
并且您希望在更新UI之前等待每个返回。 RxJava可以在这里帮助你。
让我们首先定义我们的Retrofit对象来访问Github的API,然后为两个网络请求调用设置两个observable。
Retrofit repo = new Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
Observable<JsonObject> userObservable = repo
.create(GitHubUser.class)
.getUser(loginName)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
Observable<JsonArray> eventsObservable = repo
.create(GitHubEvents.class)
.listEvents(loginName)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
用于它的接口如下:
public interface GitHubUser {
@GET("users/{user}")
Observable<JsonObject> getUser(@Path("user") String user);
}
public interface GitHubEvents {
@GET("users/{user}/events")
Observable<JsonArray> listEvents(@Path("user") String user);
}
我们使用RxJava之后 压缩 结合我们的两个Observable并在创建一个新的Observable之前等待它们完成的方法。
Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() {
@Override
public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) {
return new UserAndEvents(jsonObject, jsonElements);
}
});
最后,让我们在新的组合Observable上调用subscribe方法:
combined.subscribe(new Subscriber<UserAndEvents>() {
...
@Override
public void onNext(UserAndEvents o) {
// You can access the results of the
// two observabes via the POJO now
}
});
没有更多的线程等待网络调用完成。 RxJava已经在zip()中为您完成了所有这些。
希望我的回答可以帮到你。
1
2017-12-18 04:59
我用RxJava2解决了类似的问题。并行执行Api 2请求会略微加快工作速度。
private InformationRepository informationRepository;
//init....
public Single<List<FullInformation>> getFullInformation() {
return informationRepository.getInformationList()
.subscribeOn(Schedulers.io())//I usually write subscribeOn() in the repository, here - for clarity
.flatMapObservable(Observable::fromIterable)
.flatMapSingle(this::getFullInformation)
.collect(ArrayList::new, List::add);
}
private Single<FullInformation> getFullInformation(Information information) {
return informationRepository.getExtendedInformation(information)
.map(extendedInformation -> new FullInformation(information, extendedInformation))
.subscribeOn(Schedulers.io());//execute requests in parallel
}
InformationRepository - 只是界面。它的实现对我们来说并不有趣。
public interface InformationRepository {
Single<List<Information>> getInformationList();
Single<ExtendedInformation> getExtendedInformation(Information information);
}
FullInformation - 结果的容器。
public class FullInformation {
private Information information;
private ExtendedInformation extendedInformation;
public FullInformation(Information information, ExtendedInformation extendedInformation) {
this.information = information;
this.extendedInformation = extendedInformation;
}
}
0
2017-12-14 11:35
尝试使用 Observable.zip()
运营商。它将等待两个Api呼叫完成后再继续流。然后你可以通过调用插入一些逻辑 flatMap()
之后。
http://reactivex.io/documentation/operators/zip.html
0
2017-12-19 20:11