问题 有没有办法在地图中跳过/抛出/忽略Spark中的记录?


我们有一个非常标准的Spark作业,它从s3读取日志文件,然后对它们进行一些处理。非常基本的Spark东西......

val logs = sc.textFile(somePathTos3)
val mappedRows = logs.map(log => OurRowObject.parseLog(log.split("\t")))
val validRows = mappedRows.filter(log => log._1._1 != "ERROR")
...and continue processing

哪里 OurRowObject.parseLine 获取原始日志行并将其映射到某个(键,值)对(例如 ( (1,2,3,4), (5,6,7) ) 然后我们可以进行处理。现在,如果 parseLine 遇到“问题”日志(格式错误,空白等...)它将返回一些标记值(例如 ( ("ERROR", ...), (...) ) 过滤步骤然后过滤掉。

现在,我一直试图找到的方法是在地图中简单地不包括问题行...某种方式告诉火花“嘿这是一个空/畸形的行,跳过它并且不要' t包括一对“,而不是那个额外的过滤步骤。

我还没有找到办法做到这一点,并发现这个功能不存在(AFAICanFind)非常有趣。

谢谢


1577
2017-11-05 20:02


起源

而不是在 map,你可以补充一下 .option("mode", "DROPMALFORMED") 要么 .option("mode","FAILFAST") 在期间使用有用的异常来中断作业 textFile 读。 有关更多详细信息,请参阅文档 - ecoe


答案:


您可以使解析器返回Option [Value]而不是Value。这样你就可以使用flatMap将行映射到行并删除那些无效的行。

粗略的是这样的:

def parseLog(line:String):Option[Array[String]] = {
    val splitted = log.split("\t")
    if (validate(splitted)) Some(splitted) else None
}

val validRows = logs.flatMap(OurRowObject.parseLog(_))

9
2017-11-05 21:01



有趣的想法,我会试一试。谢谢! - K Raphael
虽然我真的很喜欢这个解决方案,但似乎你应该能够使用one参数重载来实现类似的效果 collect (代替 map 要么 flatMap)和a PartialFunction。我无法获得比工作更简单的例子,但你可能会发现这也是一个有趣的探索可能性。这个 Option[Value] 方法似乎更容易使用。 (我很乐意分享我的代码,但不会发布,因为它不太有效。) - Spiro Michaylov
@SpiroMichaylov那是对的, collect(PF[A,B]) 也应该努力做这种过滤。一件事 collect 是Spark,我不喜欢那样 collect 没有参数是将所有数据传递给驱动程序 collect(PF) 是一种转变,让人们感到困惑。你能把它作为答案添加吗?这是一个非常有效的选择。 - maasg
@maasg我完全和你谈论收集混乱的两个重载。就设计而言,它是Spark API的弱角之一。我也找到了定义的机制 PartialFunction在Scala中有点限制并且难以使用。我会看看我是否可以修复我的解决方案并发布它:我遇到了组合部分函数的可序列化问题。 - Spiro Michaylov
@maasg发布在这里,但它让我有点难过。 - Spiro Michaylov


一种方法是使用单参数重载 collect (代替 map 要么 flatMap)和a PartialFunction。如果您需要的部分功能并非完全无关紧要,这有点棘手。事实上你可能不会因为你需要解析  验证,我将在下面建模  部分函数(虽然第一个恰好是为所有输入定义的)。

// this doesn't really need to be a partial function but we'll 
// want to compose it with one and end up with a partial function
val split: PartialFunction[String, Array[String]] = {
  case log => log.split("\t")
}

// this really needs to be a partial function
val validate: PartialFunction[Array[String], Array[String]] = {
  case lines if lines.length > 2 => lines
}

val splitAndValidate = split andThen validate

val logs = sc.parallelize(Seq("a\tb", "u\tv\tw", "a", "x\ty\tz"), 4)

// only accept the logs with more than two entries
val validRows = logs.collect(splitAndValidate)

这是非常好的Scala,但它不起作用,因为 splitAndValidate 是不可序列化的,我们正在使用Spark。 (注意 split 和 validate 是可序列化的:问题在于组成!)所以,我们需要制作一个 PartialFunction 那   序列化:

class LogValidator extends PartialFunction[String, Array[String]] with Serializable {

  private val validate: PartialFunction[Array[String], Array[String]] = {
    case lines if lines.length > 2 => lines
  }

  override def apply(log: String) : Array[String] = {
    validate(log.split("\t"))
  }

  override def isDefinedAt(log: String) : Boolean = {
    validate.isDefinedAt(log.split("\t"))
  }

}

然后我们可以打电话

val validRows = logs.collect(new LogValidator())

5
2017-11-08 21:03