我有一个RDD结构
RDD[(String, String)]
我想创建2个列表(rdd的每个维度一个)。
我尝试使用rdd.foreach()并填充两个ListBuffers然后将它们转换为Lists,但我猜每个节点都创建自己的ListBuffer,因为在迭代之后BufferLists为空。我该怎么做 ?
编辑:我的方法
val labeled = data_labeled.map { line =>
val parts = line.split(',')
(parts(5), parts(7))
}.cache()
var testList : ListBuffer[String] = new ListBuffer()
labeled.foreach(line =>
testList += line._1
)
val labeledList = testList.toList
println("rdd: " + labeled.count)
println("bufferList: " + testList.size)
println("list: " + labeledList.size)
结果是:
rdd: 31990654
bufferList: 0
list: 0
如果你真的想创造两个 清单 - 意思是,您希望将所有分布式数据收集到驱动程序应用程序中(冒着缓慢或缓慢的风险) OutOfMemoryError
) - 您可以使用 collect
然后使用简单 map
对结果的操作:
val list: List[(String, String)] = rdd.collect().toList
val col1: List[String] = list.map(_._1)
val col2: List[String] = list.map(_._2)
或者 - 如果您想将RDD“拆分”为两个 RDDS - 它没有收集数据非常相似:
rdd.cache() // to make sure calculation of rdd is not repeated twice
val rdd1: RDD[String] = rdd.map(_._1)
val rdd2: RDD[String] = rdd.map(_._2)
第三种方法是首先映射到这两个RDD,然后收集它们中的每一个,但它与第一个选项没有太大差别,并且存在相同的风险和限制。
作为Tzach Zohar答案的替代品,您可以使用 unzip
在名单上:
scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d")))
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val (l1, l2) = myRDD.collect.toList.unzip
l1: List[String] = List(a, c)
l2: List[String] = List(b, d)
要么 keys
和 values
在...上 RDD
S:
scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33
scala> rdd1.foreach{println}
a
c
scala> rdd2.foreach{println}
d
b