问题 SQL over Spark Streaming


这是通过Spark Streaming运行简单SQL查询的代码。

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration

object StreamingSQL {

  case class Persons(name: String, age: Int)

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people/")
    lines.foreachRDD(rdd=>rdd.foreach(println))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created

    lines.foreachRDD(rdd=>{
      rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
      val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
      teenagers.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

如您所见,要通过流式传输SQL运行,必须在foreachRDD方法中进行查询。 我想对从两个不同的流接收的数据运行SQL连接。有什么方法可以做到吗?


11035
2017-08-25 11:26


起源



答案:


好吧,我想总结一下我们在答案讨论后得到的解决方法 斯皮罗。他建议首先创建一个空表,然后将RDD插入其中。唯一的问题是 Spark不允许插入表中。这是可以做的:

首先,创建一个RDD,它具有与您期望的流相同的模式:

import sqlContext.createSchemaRDD
val d1=sc.parallelize(Array(("a",10),("b",3))).map(e=>Rec(e._1,e._2))

然后保存为 实木复合地文件

d1.saveAsParquetFile("/home/p1.parquet")

现在,加载镶木地板文件并使用。将其注册为表 registerAsTable() 方法。

val parquetFile = sqlContext.parquetFile("/home/p1.parquet")
parquetFile.registerAsTable("data")

现在,当您收到您的信息流时,只需应用一个 foreachRDD() 在你的流上,并使用。继续在上面创建的表中插入单个RDD 插入() 方法

dStream.foreachRDD(rdd=>{
rdd.insertInto("data")
})

这个insertInto()工作正常,允许将数据收集到表中。现在,您可以对任意数量的流执行相同操作,然后运行查询。


8
2017-09-03 06:36





编写代码的方式,每次运行SQL查询时,最终都会产生一系列小的SchemaRDD。诀窍是将这些中的每一个保存到累积RDD或累积表。

第一,表的方法,使用 insertInto

对于每个流,首先创建一个您注册为表的emty RDD,获取一个空表。对于你的例子,假设你称之为“allTeenagers”。

然后,对于每个查询,使用SchemaRDD insertInto 将结果添加到该表的方法:

teenagers.insertInto("allTeenagers")

如果对两个流执行此操作,创建两个单独的累积表,则可以使用普通的旧SQL查询来连接它们。

(注意:我实际上并没有能够让他上班,而且一点点搜索让我怀疑其他人是否有,但我很确定我已经理解了设计意图 insertInto,所以我认为这个解决方案值得记录。)

第二unionAll 方法(还有一个 union 方法,但这使得正确的类型更难):

这涉及创建一个初始RDD - 再次让我们称之为 allTeenagers

// create initial SchemaRDD even if it's empty, so the types work out right
var allTeenagers = sqc.sql("SELECT ...")

然后,每次:

val teenagers = sqc.sql("SELECT ...")
allTeenagers = allTeenagers.unionAll(teenagers)

也许不用说,你需要列匹配。


5
2017-08-25 22:53



谢谢回复。我尝试过类似的东西 var p1 = Person("Hari",22); val rdd1 = sc.parallelize(Array(p1)); rdd1.registerAsTable("data"); var p2 = Person("sagar", 22); var rdd2 = sc.parallelize(Array(p2)); rdd2.insertInto("data"); 并得到错误“java.lang.AssertionError:断言失败:没有计划InsertIntoTable Map(),false”似乎我使用insertInto()错误的方式? - Pravesh Jain
@Pravesh:我有同样的问题。我很确定它应该有效但有些搜索让我想知道是否有人使用它。我很好奇你回答你在Spark列表上发布的问题的回复。我用第二个解决方案更新了我的答案,我非常肯定会根据这个解决方案 unionAll,我很惊讶没有人建议过。后者的一个简单例子对我来说很好。 - Spiro Michaylov
感谢您提出宝贵的建议。如果你发现新的东西,请让我更新。也会这样做。 - Pravesh Jain
@Pravesh:你有没有在我的回答中排除第二个解决方案,(unionAll)出于某种原因? - Spiro Michaylov
@Pravesh:我并不是建议您将来自不同流的数据收集到一个RDD中,而是收集通过以下方式收集的RDD片段: foreachRDD 从EACH流到累加器表/ RDD,产生两个表或两个RDD,然后你可以加入(每个包含到目前为止相应流中的所有数据) - Spiro Michaylov


答案:


好吧,我想总结一下我们在答案讨论后得到的解决方法 斯皮罗。他建议首先创建一个空表,然后将RDD插入其中。唯一的问题是 Spark不允许插入表中。这是可以做的:

首先,创建一个RDD,它具有与您期望的流相同的模式:

import sqlContext.createSchemaRDD
val d1=sc.parallelize(Array(("a",10),("b",3))).map(e=>Rec(e._1,e._2))

然后保存为 实木复合地文件

d1.saveAsParquetFile("/home/p1.parquet")

现在,加载镶木地板文件并使用。将其注册为表 registerAsTable() 方法。

val parquetFile = sqlContext.parquetFile("/home/p1.parquet")
parquetFile.registerAsTable("data")

现在,当您收到您的信息流时,只需应用一个 foreachRDD() 在你的流上,并使用。继续在上面创建的表中插入单个RDD 插入() 方法

dStream.foreachRDD(rdd=>{
rdd.insertInto("data")
})

这个insertInto()工作正常,允许将数据收集到表中。现在,您可以对任意数量的流执行相同操作,然后运行查询。


8
2017-09-03 06:36





编写代码的方式,每次运行SQL查询时,最终都会产生一系列小的SchemaRDD。诀窍是将这些中的每一个保存到累积RDD或累积表。

第一,表的方法,使用 insertInto

对于每个流,首先创建一个您注册为表的emty RDD,获取一个空表。对于你的例子,假设你称之为“allTeenagers”。

然后,对于每个查询,使用SchemaRDD insertInto 将结果添加到该表的方法:

teenagers.insertInto("allTeenagers")

如果对两个流执行此操作,创建两个单独的累积表,则可以使用普通的旧SQL查询来连接它们。

(注意:我实际上并没有能够让他上班,而且一点点搜索让我怀疑其他人是否有,但我很确定我已经理解了设计意图 insertInto,所以我认为这个解决方案值得记录。)

第二unionAll 方法(还有一个 union 方法,但这使得正确的类型更难):

这涉及创建一个初始RDD - 再次让我们称之为 allTeenagers

// create initial SchemaRDD even if it's empty, so the types work out right
var allTeenagers = sqc.sql("SELECT ...")

然后,每次:

val teenagers = sqc.sql("SELECT ...")
allTeenagers = allTeenagers.unionAll(teenagers)

也许不用说,你需要列匹配。


5
2017-08-25 22:53



谢谢回复。我尝试过类似的东西 var p1 = Person("Hari",22); val rdd1 = sc.parallelize(Array(p1)); rdd1.registerAsTable("data"); var p2 = Person("sagar", 22); var rdd2 = sc.parallelize(Array(p2)); rdd2.insertInto("data"); 并得到错误“java.lang.AssertionError:断言失败:没有计划InsertIntoTable Map(),false”似乎我使用insertInto()错误的方式? - Pravesh Jain
@Pravesh:我有同样的问题。我很确定它应该有效但有些搜索让我想知道是否有人使用它。我很好奇你回答你在Spark列表上发布的问题的回复。我用第二个解决方案更新了我的答案,我非常肯定会根据这个解决方案 unionAll,我很惊讶没有人建议过。后者的一个简单例子对我来说很好。 - Spiro Michaylov
感谢您提出宝贵的建议。如果你发现新的东西,请让我更新。也会这样做。 - Pravesh Jain
@Pravesh:你有没有在我的回答中排除第二个解决方案,(unionAll)出于某种原因? - Spiro Michaylov
@Pravesh:我并不是建议您将来自不同流的数据收集到一个RDD中,而是收集通过以下方式收集的RDD片段: foreachRDD 从EACH流到累加器表/ RDD,产生两个表或两个RDD,然后你可以加入(每个包含到目前为止相应流中的所有数据) - Spiro Michaylov