我们有一个非常标准的Spark作业,它从s3读取日志文件,然后对它们进行一些处理。非常基本的Spark东西......
val logs = sc.textFile(somePathTos3)
val mappedRows = logs.map(log => OurRowObject.parseLog(log.split("\t")))
val validRows = mappedRows.filter(log => log._1._1 != "ERROR")
...and continue processing
哪里 OurRowObject.parseLine
获取原始日志行并将其映射到某个(键,值)对(例如 ( (1,2,3,4), (5,6,7) )
然后我们可以进行处理。现在,如果 parseLine
遇到“问题”日志(格式错误,空白等...)它将返回一些标记值(例如 ( ("ERROR", ...), (...) )
过滤步骤然后过滤掉。
现在,我一直试图找到的方法是在地图中简单地不包括问题行...某种方式告诉火花“嘿这是一个空/畸形的行,跳过它并且不要' t包括一对“,而不是那个额外的过滤步骤。
我还没有找到办法做到这一点,并发现这个功能不存在(AFAICanFind)非常有趣。
谢谢
您可以使解析器返回Option [Value]而不是Value。这样你就可以使用flatMap将行映射到行并删除那些无效的行。
粗略的是这样的:
def parseLog(line:String):Option[Array[String]] = {
val splitted = log.split("\t")
if (validate(splitted)) Some(splitted) else None
}
val validRows = logs.flatMap(OurRowObject.parseLog(_))
一种方法是使用单参数重载 collect
(代替 map
要么 flatMap
)和a PartialFunction
。如果您需要的部分功能并非完全无关紧要,这有点棘手。事实上你可能不会因为你需要解析 和 验证,我将在下面建模 二 部分函数(虽然第一个恰好是为所有输入定义的)。
// this doesn't really need to be a partial function but we'll
// want to compose it with one and end up with a partial function
val split: PartialFunction[String, Array[String]] = {
case log => log.split("\t")
}
// this really needs to be a partial function
val validate: PartialFunction[Array[String], Array[String]] = {
case lines if lines.length > 2 => lines
}
val splitAndValidate = split andThen validate
val logs = sc.parallelize(Seq("a\tb", "u\tv\tw", "a", "x\ty\tz"), 4)
// only accept the logs with more than two entries
val validRows = logs.collect(splitAndValidate)
这是非常好的Scala,但它不起作用,因为 splitAndValidate
是不可序列化的,我们正在使用Spark。 (注意 split
和 validate
是可序列化的:问题在于组成!)所以,我们需要制作一个 PartialFunction
那 是 序列化:
class LogValidator extends PartialFunction[String, Array[String]] with Serializable {
private val validate: PartialFunction[Array[String], Array[String]] = {
case lines if lines.length > 2 => lines
}
override def apply(log: String) : Array[String] = {
validate(log.split("\t"))
}
override def isDefinedAt(log: String) : Boolean = {
validate.isDefinedAt(log.split("\t"))
}
}
然后我们可以打电话
val validRows = logs.collect(new LogValidator())