问题 Spark:RDD到List


我有一个RDD结构

RDD[(String, String)]

我想创建2个列表(rdd的每个维度一个)。

我尝试使用rdd.foreach()并填充两个ListBuffers然后将它们转换为Lists,但我猜每个节点都创建自己的ListBuffer,因为在迭代之后BufferLists为空。我该怎么做 ?

编辑:我的方法

val labeled = data_labeled.map { line =>
  val parts = line.split(',')
  (parts(5), parts(7))
}.cache()

var testList : ListBuffer[String] = new ListBuffer()

labeled.foreach(line =>
  testList += line._1
)
  val labeledList = testList.toList
  println("rdd: " + labeled.count)
  println("bufferList: " + testList.size)
  println("list: " + labeledList.size)

结果是:

rdd: 31990654
bufferList: 0
list: 0

6661
2017-11-30 16:01


起源

请使用您尝试过的代码以及一些输入数据样本和预期输出进行更新!你的问题对我来说不是很清楚。 - eliasah


答案:


如果你真的想创造两个 清单  - 意思是,您希望将所有分布式数据收集到驱动程序应用程序中(冒着缓慢或缓慢的风险) OutOfMemoryError) - 您可以使用 collect 然后使用简单 map 对结果的操作:

val list: List[(String, String)] = rdd.collect().toList
val col1: List[String] = list.map(_._1)
val col2: List[String] = list.map(_._2)

或者 - 如果您想将RDD“拆分”为两个 RDDS  - 它没有收集数据非常相似:

rdd.cache() // to make sure calculation of rdd is not repeated twice
val rdd1: RDD[String] = rdd.map(_._1)
val rdd2: RDD[String] = rdd.map(_._2)

第三种方法是首先映射到这两个RDD,然后收集它们中的每一个,但它与第一​​个选项没有太大差别,并且存在相同的风险和限制。


13
2017-11-30 16:16



@Yuriy为什么广播变量(这是只读的)与此相关?你能描述一下吗? - avr
@avr ListBuffer是可变的 += 改变内部状态,而不是创建新的引用。但是你的问题是好的,并且对于不可变的语句(其中引用更改为任何操作)需要用某些东西包装它(Serializable)。 List的简单示例: val testList = sc.broadcast(new Serializable { var list = List.empty[String] }),并在变异内部状态之后。 - Yuriy
@Yuriy我认为avr是对的,你误解了他/她的问题 - 这不是可变与不可变集合的问题 - 广播变量是 只读 从某种意义上说,如果在执行程序中更改了它们的值,驱动程序代码将不会看到此更改(Spark如何聚合所有执行程序所做的更改?)。这在本地模式下工作的事实看起来大多像一个bug,它在集群实际分布的地方不起作用。 - Tzach Zohar
你没错,我错过了这一点。删除了我的更改,对不起噪音。 - Yuriy


作为Tzach Zohar答案的替代品,您可以使用 unzip 在名单上:

scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d")))
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val (l1, l2) = myRDD.collect.toList.unzip
l1: List[String] = List(a, c)
l2: List[String] = List(b, d)

要么 keys 和 values 在...上 RDDS:

scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33

scala> rdd1.foreach{println}
a
c

scala> rdd2.foreach{println}
d
b

2
2017-11-30 19:37