问题 使用Spark列出Hadoop HDFS目录中的所有文件?


我想循环遍历Hadoop目录中的所有文本文件,并计算“错误”一词的所有出现次数。有没有办法做一个 hadoop fs -ls /users/ubuntu/ 使用Apache Spark Scala API列出目录中的所有文件?

从给定的 第一个例子,spark上下文似乎只能通过以下方式单独访问文件:

val file = spark.textFile("hdfs://target_load_file.txt")

在我的问题中,我不知道预先在HDFS文件夹中有多少文件名。看着 spark context docs 但找不到这种功能。


11188
2018-04-28 22:31


起源



答案:


您可以使用通配符:

val errorCount = sc.textFile("hdfs://some-directory/*")
                   .flatMap(_.split(" ")).filter(_ == "error").count

14
2018-04-30 12:48



如果我想报告发生错误的文件名,该怎么办? - Santiago Cepas
使用 sc.wholeTextFiles。看到 stackoverflow.com/questions/29521665/... 对于那个问题。 - Daniel Darabos


答案:


您可以使用通配符:

val errorCount = sc.textFile("hdfs://some-directory/*")
                   .flatMap(_.split(" ")).filter(_ == "error").count

14
2018-04-30 12:48



如果我想报告发生错误的文件名,该怎么办? - Santiago Cepas
使用 sc.wholeTextFiles。看到 stackoverflow.com/questions/29521665/... 对于那个问题。 - Daniel Darabos


import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import scala.collection.mutable.Stack


val fs = FileSystem.get( sc.hadoopConfiguration )
var dirs = Stack[String]()
val files = scala.collection.mutable.ListBuffer.empty[String]
val fs = FileSystem.get(sc.hadoopConfiguration)

dirs.push("/user/username/")

while(!dirs.isEmpty){
    val status = fs.listStatus(new Path(dirs.pop()))
    status.foreach(x=> if(x.isDirectory) dirs.push(x.getPath.toString) else 
    files+= x.getPath.toString)
}
files.foreach(println)

1
2018-05-17 18:39