问题 RxJava组合请求序列


问题

我有两个Apis。 Api 1给了我一个项目列表,Api 2给了我更多关于我从Api获得的每个项目的详细信息。到目前为止,我解决它的方式导致了糟糕的性能。

问题

在Retrofit和RxJava的帮助下,快速,快速地解决了这个问题。

我的方法

在片刻我的解决方案看起来像这样:

第1步:执行改造 Single<ArrayList<Information>> 来自Api 1。

第2步:我遍历这些项目并向Api 2发出请求。

第3步:改造返回按顺序执行 Single<ExtendedInformation> 对于 每一个项目

步骤4:在完成Api 2的所有调用完成后,我为包含信息和扩展信息的所有项创建一个新对象。

我的代码

 public void addExtendedInformations(final Information[] informations) {
        final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
        final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
            @Override
            public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
                informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
                if (informationDetailArrayList.size() >= informations.length){
                    listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
                }
            }
        };

        for (Information information : informations) {
            getExtendedInformation(ratingRequestListener, information);
        }
    }

    public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
        Single<ExtendedInformation> repos = service.findForTitle(information.title);
        disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
            @Override
            public void onSuccess(ExtendedInformation extendedInformation) {
                    ratingRequestListener.onDownloadFinished(information, extendedInformation);
            }

            @Override
            public void onError(Throwable e) {
                ExtendedInformation extendedInformation = new ExtendedInformation();
                ratingRequestListener.onDownloadFinished(extendedInformation, information);
            }
        }));
    }

    public interface RatingRequestListener {

        void onDownloadFinished(Information information, ExtendedInformation extendedInformation);

    }

6696
2017-12-04 20:39


起源

为什么你的方法需要 synchronized ? - Brice
@Brice有一段时间他们需要同步,我忘了删除它。谢谢 :) - Mayr Technologies
你有点把自己置于这种状态。问题:1。它是否真的需要逐行查询? (批量查询通常效率更高)。你真的必须等到所有结果到来才能开始吗?您是否可以选择在抵达时立即显示结果? - M. Prokhorov


答案:


TL;博士 使用 concatMapEager 要么 flatMap 并异步或在调度程序上执行子调用。


很长的故事

我不是Android开发人员,所以我的问题将仅限于纯RxJava(版本1和版本2)。

如果我得到正确的图片,所需的流程是:

some query param 
  \--> Execute query on API_1 -> list of items
          |-> Execute query for item 1 on API_2 -> extended info of item1
          |-> Execute query for item 2 on API_2 -> extended info of item1
          |-> Execute query for item 3 on API_2 -> extended info of item1
          ...
          \-> Execute query for item n on API_2 -> extended info of item1
  \----------------------------------------------------------------------/
      |
      \--> stream (or list) of extended item info for the query param

假设Retrofit为客户生成了

interface Api1 {
    @GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}

interface Api2 {
    @GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}

如果项目的顺序不重要,则可以使用 flatMap 只要:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

除非 改造生成器配置有

  • 使用异步适配器(调用将在okhttp内部执行程序中排队)。我个人认为这不是一个好主意,因为你无法控制这个执行者。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
    
  • 或者使用基于调度程序的适配器(将在RxJava调度程序上调度调用)。这是我的首选方案,因为您明确选择使用哪个调度程序,它很可能是IO调度程序,但您可以自由尝试不同的调度程序。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
    

原因是 flatMap 将订阅由创建的每个observable api2.extendedInfo(...) 并将它们合并到生成的observable中。因此,结果将按接收顺序显示。

如果 改造客户 不是 设置为异步或设置为在调度程序上运行,可以设置一个:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

这个结构几乎与前面提到的一个execpts相同 本地 每个调度程序 api2.extendedInfo 应该运行。

可以调整 maxConcurrency 的参数 flatMap 控制您想要同时执行的请求数。虽然我对此要谨慎,但您不希望同时运行所有查询。通常是默认值 maxConcurrency 够好了(128)。

现在,如果原始查询的顺序很重要concatMap 通常是运营商做同样的事情 flatMap 顺序但顺序,如果代码需要等待执行所有子查询,则结果很慢。然而,解决方案又向前迈进了一步 concatMapEager,这个将按顺序订阅observable,并根据需要缓冲结果。

假设改造客户端是异步的或在特定的调度程序上运行:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

或者如果必须在本地设置调度程序:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

也可以在此运算符中调整并发性。


此外,如果Api正在返回 Flowable,有可能使用 .parallel 目前在RxJava 2.1.7中仍处于测试阶段。但是后来结果不合适而且我不知道一种方式(但是?)订购它们而不进行排序。

api.items(queryParam) // Flowable<Item>
   .parallel(10)
   .runOn(Schedulers.io())
   .map(item -> api2.extendedInfo(item.id()))
   .sequential();     // Flowable<ItemExtended>

12
2017-12-06 14:05



item.id() 可以从 item 这是在。中找到的任何项目 itemList (这个清单是 平面映射 通过 .flatMap(itemList -> Observable.fromIterable(itemList)) 为了 管 列表中的每个元素作为可观察通量中的单个元素)。 - Brice
如果你需要 积累 该 extendedInfo 进入列表,有可能使用 .toList() 运营商。如果使用RxJava 2 toList 回报 Single<List<...>> 得到一个 Observable<List<...>> 它可以链接 .toList().toObservable() - Brice
是的,有必要 平面地图 api1调用返回的列表。或者通过 .flatMap(itemList -> Observable.fromIterable(itemList))或者 .flatMapIterable(itemList -> Arrays.asList(itemList.items)) (可以简化为 .flatMapIterable(itemList -> itemList) 如果 itemList 器物 Iterable 很可能)。 - Brice
这是一个很好的答案,我只是想感谢您花时间详细解释所有这些问题。调度适配器工厂的调度程序解释非常有用。 - Matthew Bahr
@MatthewBahr谢谢:) - Brice


flatMap 运营商旨在满足这些类型的工作流程。

我将通过一个简单的五步示例来概述广泛的笔画。希望您可以轻松地在代码中重建相同的原则:

@Test fun flatMapExample() {
    // (1) constructing a fake stream that emits a list of values
    Observable.just(listOf(1, 2, 3, 4, 5))
            // (2) convert our List emission into a stream of its constituent values 
            .flatMap { numbers -> Observable.fromIterable(numbers) }
            // (3) subsequently convert each individual value emission into an Observable of some 
            //     newly calculated type
            .flatMap { number ->
                when(number) {
                       1 -> Observable.just("A1")
                       2 -> Observable.just("B2")
                       3 -> Observable.just("C3")
                       4 -> Observable.just("D4")
                       5 -> Observable.just("E5")
                    else -> throw RuntimeException("Unexpected value for number [$number]")
                }
            }
            // (4) collect all the final emissions into a list
            .toList()
            .subscribeBy(
                    onSuccess = {
                        // (5) handle all the combined results (in list form) here
                        println("## onNext($it)")
                    },
                    onError = { error ->
                        println("## onError(${error.message})")
                    }
            )
}

(顺便说一下,如果排放的顺序很重要,请看看使用情况 concatMap 代替)。

我希望有所帮助。


1
2017-12-04 21:05



感谢您的回答。但是我不知道如何将它应用到我的例子中。 - Mayr Technologies
这并不完全回答这个问题,例如:列表没有被冻结,这是查询的结果,所以kotlin when 语句不能使用,甚至不使用kotlin的会计项目,加上它没有说明为什么这样或如何更快。加 flatMap 默认情况下不是并发的,所以它并没有真正加快速度。 - Brice
@Brice示例代码实现了无意义,然而,简单的逻辑以展示更广泛的观点。 when 我试图证明的并不是真的很突出。你是正确的,它没有表现出任何特别的速度提升,但更重要的是(在我看来)它确实展示了惯用的Rx转换流而不是打破它并恢复到某种形式的“回调地狱”作为原始解决方案似乎做到了。你的(写得很好)解决方案甚至承认可以获得微不足道的性能提升:“通常默认值足够好”。 - homerman
回调的事情确实要解决,但这不是问题的一部分。这部分并不意味着表现可以忽略不计。真正的诀窍是以异步方式执行子查询 flatMap 要么 concatMapEager。引用的部分是指 maxConcurrency 允许调整一次可以订阅多少个子Observable的参数 flatMap 要么 concatMapEager。 - Brice
@hsl我更新了我的答案来描述更多这一点。 - Brice


检查下面它是否正常工作。

假设你有多个网络调用,你需要make-cals来获取Github用户信息和Github用户事件。

并且您希望在更新UI之前等待每个返回。 RxJava可以在这里帮助你。 让我们首先定义我们的Retrofit对象来访问Github的API,然后为两个网络请求调用设置两个observable。

Retrofit repo = new Retrofit.Builder()
        .baseUrl("https://api.github.com")
        .addConverterFactory(GsonConverterFactory.create())
        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
        .build();

Observable<JsonObject> userObservable = repo
        .create(GitHubUser.class)
        .getUser(loginName)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread());

Observable<JsonArray> eventsObservable = repo
        .create(GitHubEvents.class)
        .listEvents(loginName)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread());

用于它的接口如下:

public interface GitHubUser {
  @GET("users/{user}")
  Observable<JsonObject> getUser(@Path("user") String user);
}

public interface GitHubEvents {
  @GET("users/{user}/events")
  Observable<JsonArray> listEvents(@Path("user") String user);
}

我们使用RxJava之后 压缩 结合我们的两个Observable并在创建一个新的Observable之前等待它们完成的方法。

Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() {
  @Override
  public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) {
    return new UserAndEvents(jsonObject, jsonElements);
  }
});

最后,让我们在新的组合Observable上调用subscribe方法:

combined.subscribe(new Subscriber<UserAndEvents>() {
          ...
          @Override
          public void onNext(UserAndEvents o) {
            // You can access the results of the 
            // two observabes via the POJO now
          }
        });

没有更多的线程等待网络调用完成。 RxJava已经在zip()中为您完成了所有这些。 希望我的回答可以帮到你。


1
2017-12-18 04:59





我用RxJava2解决了类似的问题。并行执行Api 2请求会略微加快工作速度。

private InformationRepository informationRepository;

//init....

public Single<List<FullInformation>> getFullInformation() {
    return informationRepository.getInformationList()
            .subscribeOn(Schedulers.io())//I usually write subscribeOn() in the repository, here - for clarity
            .flatMapObservable(Observable::fromIterable)
            .flatMapSingle(this::getFullInformation)
            .collect(ArrayList::new, List::add);

}

private Single<FullInformation> getFullInformation(Information information) {
    return informationRepository.getExtendedInformation(information)
            .map(extendedInformation -> new FullInformation(information, extendedInformation))
            .subscribeOn(Schedulers.io());//execute requests in parallel
}

InformationRepository - 只是界面。它的实现对我们来说并不有趣。

public interface InformationRepository {

    Single<List<Information>> getInformationList();

    Single<ExtendedInformation> getExtendedInformation(Information information);
}

FullInformation - 结果的容器。

public class FullInformation {

    private Information information;
    private ExtendedInformation extendedInformation;

    public FullInformation(Information information, ExtendedInformation extendedInformation) {
        this.information = information;
        this.extendedInformation = extendedInformation;
    }
}

0
2017-12-14 11:35





尝试使用 Observable.zip() 运营商。它将等待两个Api呼叫完成后再继续流。然后你可以通过调用插入一些逻辑 flatMap() 之后。

http://reactivex.io/documentation/operators/zip.html


0
2017-12-19 20:11