问题 为什么在流数据集上使用缓存失败并显示“AnalysisException:必须使用writeStream.start()执行带有流源的查询”?


SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "C:/tmp/spark")
  .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
  .appName("my-test")
  .getOrCreate
  .readStream
  .schema(schema)
  .json("src/test/data")
  .cache
  .writeStream
  .start
  .awaitTermination

在spark 2.1.0中执行此示例时出现错误。 没有 .cache 选项按预期工作但有 .cache 我得到的选项:

线程“main”中的异常org.apache.spark.sql.AnalysisException:必须使用writeStream.start();;执行带有流源的查询   FileSource [SRC /测试/数据]       在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ catalyst $ analysis $ UnsupportedOperationChecker $$ throwError(UnsupportedOperationChecker.scala:196)       在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply(UnsupportedOperationChecker.scala:35)       在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply(UnsupportedOperationChecker.scala:33)       在org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)       在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .checkForBatch(UnsupportedOperationChecker.scala:33)       在org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)       在org.apache.spark.sql.execution.QueryExecution.withCachedData $ lzycompute(QueryExecution.scala:69)       在org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)       在org.apache.spark.sql.execution.QueryExecution.optimizedPlan $ lzycompute(QueryExecution.scala:73)       在org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)       在org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute(QueryExecution.scala:79)       在org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)       在org.apache.spark.sql.execution.QueryExecution.executedPlan $ lzycompute(QueryExecution.scala:84)       在org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)       在org.apache.spark.sql.execution.CacheManager $$ anonfun $ cacheQuery $ 1.apply(CacheManager.scala:102)       在org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)       在org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)       在org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)       在org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)       在org.me.App $ .main(App.scala:23)       在org.me.App.main(App.scala)

任何想法?


8922
2018-02-06 07:07


起源

对不起,但我不认为只是不使用缓存是解决方案。 - Martin Brišiak
马丁,随时参与评论 SPARK-20927 关于在流式计算上缓存的需要 - mathieu


答案:


你的(非常有趣的)案例归结为以下行(你可以执行 spark-shell):

scala> :type spark
org.apache.spark.sql.SparkSession

scala> spark.readStream.text("files").cache
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[files]
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104)
  at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
  at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
  at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603)
  at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613)
  ... 48 elided

原因很简单,解释一下(对Spark SQL没有任何不满 explain 意)。

spark.readStream.text("files") 创造了一个所谓的 流数据集

scala> val files = spark.readStream.text("files")
files: org.apache.spark.sql.DataFrame = [value: string]

scala> files.isStreaming
res2: Boolean = true

流数据集是Spark SQL的基础 结构化流媒体

正如您在结构化流媒体中所读到的那样 快速示例

然后使用启动流式计算 start()

引用DataStreamWriter的scaladoc 开始

start():StreamingQuery 开始执行流式查询,当新数据到达时,将不断将结果输出到给定路径。

所以,你必须使用 start (要么 foreach)开始执行流式查询。你已经知道了。

但是这里有 不支持的操作 在结构化流中:

此外,有一些数据集方法不适用于流数据集。它们是立即运行查询并返回结果的操作,这在流式数据集上没有意义。

如果您尝试这些操作中的任何一个,您将看到一个AnalysisException,例如“流数据框架/数据集不支持操作XYZ”。

看起来很熟悉, 不是吗?

cache 是  在不受支持的操作列表中,但这是因为它被忽略了(我报告 SPARK-20927 要解决这个问题)。

cache 应该在列表中  在查询在Spark SQL的CacheManager中注册之前执行查询。

让我们深入探讨Spark SQL的深度......屏住呼吸...

cache    persist 而 persist  请求当前的CacheManager缓存查询

sparkSession.sharedState.cacheManager.cacheQuery(this)

在缓存查询时 CacheManager    执行它

sparkSession.sessionState.executePlan(planToCache).executedPlan

我们 知道 是不允许的,因为它是 start (要么 foreach)这样做。

问题解决了!


14
2018-05-30 19:21



我认为这是一个bug,所以我报告的更快 issues.apache.org/jira/browse/SPARK-20865 ,我只需要确认我的强硬。谢谢。 - Martin Brišiak
链接到master并不重要,因为目标代码可以更改。我认为这是你链接中的附加内容 - crak
@crak正确。我不应该使用master作为链接。你觉得什么会更好?看到过去特定版本的链接,但今天无法弄清楚如何在github上做到这一点。介意提供一些帮助?我很感激。 - Jacek Laskowski
我也想知道,但因为你的链接在代码更改时没有价值,我建议定位一个特定的提交。你的帖子是在T时间写的,所以也许它在未来的火花版本中没有关系。我并不认为你的帖子在特定日期是真的。 - crak
@mathieu好的。你是对的。它可以得到支持,而不是设计。请注意,Spark 2.3将推出另一个流引擎(可能会改变重新缓存的内容)。 - Jacek Laskowski