问题 spark - 在地图内过滤


我正在尝试过滤内部地图功能。基本上我在经典map-reduce中这样做的方式是mapper在过滤条件满足时不会向上下文写任何东西。我怎样才能实现类似的火花?我似乎无法从map函数返回null,因为它在shuffle步骤中失败。我可以使用过滤器功能,但是在我可以在地图期间执行相同的任务时,它似乎是不必要的数据集迭代。我也可以尝试使用虚拟键输出null,但这是一个糟糕的解决方法。


5118
2018-03-03 22:53


起源

你可以添加说明问题的示例代码吗? - maasg


答案:


选择很少:

rdd.flatMaprdd.flatMap 会压扁一个 Traversable 收集到RDD。要选择元素,您通常会返回一个元素 Option 由于转型。

rdd.flatMap(elem => if (filter(elem)) Some(f(elem)) else None)

rdd.collect(pf: PartialFunction) 允许您提供可以过滤和转换原始RDD中的元素的部分函数。您可以使用此方法的所有模式匹配功能。

rdd.collect{case t if (cond(t)) => f(t)}
rdd.collect{case t:GivenType => f(t)}

正如Dean Wampler在评论中提到的, rdd.map(f(_)).filter(cond(_)) 可能与上面提到的其他更“简洁”的选项一样好,甚至更快。

哪里 f 是一个转换(或地图)功能。


14
2018-03-03 23:31



如果你使用 ...filter().map(),它们将在每个分区的相同任务中执行,类似于在MapReduce中链接“映射器”。这甚至可能比单个更快 flatMap 要么 collect,取决于分配了多少个临时对象,然后快速收集垃圾。 - Dean Wampler
@DeanWampler我知道流水线,但知道这一点很好 filter().map() 可能会快于 flatmap 要么 collect......我们换了很多 map().filter() 通过 collect() b / c读取效果更好,但需要检查性能。谢谢。 - maasg
谢谢。就目前而言,“地图链”的方式并不妨碍我。当我查看perf时,我将在后面研究Java 8中的等效flatmap建议 - nir
@maasg flatmap对我来说对代码简单性和性能都很有用。我也删除了spark sql图层,并使用flatmap函数进行过滤和映射。 - nir