问题 如何在Kafka Direct Stream中使用Spark Structured Streaming?


我碰到 Spark的结构化流媒体,它有一个连续消耗S3存储桶并将处理结果写入MySQL数据库的示例。

// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")

// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
       .writeStream.format("jdbc")
       .start("jdbc:mysql//...")

如何使用它 Spark Kafka Streaming

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

有没有办法结合这两个例子而不使用 stream.foreachRDD(rdd => {})


9384
2017-09-01 15:39


起源



答案:


有没有办法结合这两个例子而不使用    stream.foreachRDD(rdd => {})

还没。 Spark 2.0.0没有Kafka sink支持结构化流。这是一个应该出现的功能 根据Tathagata Das的Spark 2.1.0,Spark Streaming的创建者之一。

这是相关的JIRA问题

编辑:(6.12.2016)

Kafka 0.10用于结构化流媒体的集成现在 在Spark 2.0.2中支持expiramentaly

val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

ds1
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

11
2017-09-01 17:48



有没有办法跟踪进度功能?例如。一个Jira故事,一个功能请求等? - SergeyB
@ike_love是的,你可以找到它 这里 - Yuval Itzchakov
有关(Kafka)Alpha版本的信息,请参阅以下文档: spark.apache.org/docs/latest/... - Murtaza Kanchwala


答案:


有没有办法结合这两个例子而不使用    stream.foreachRDD(rdd => {})

还没。 Spark 2.0.0没有Kafka sink支持结构化流。这是一个应该出现的功能 根据Tathagata Das的Spark 2.1.0,Spark Streaming的创建者之一。

这是相关的JIRA问题

编辑:(6.12.2016)

Kafka 0.10用于结构化流媒体的集成现在 在Spark 2.0.2中支持expiramentaly

val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

ds1
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

11
2017-09-01 17:48



有没有办法跟踪进度功能?例如。一个Jira故事,一个功能请求等? - SergeyB
@ike_love是的,你可以找到它 这里 - Yuval Itzchakov
有关(Kafka)Alpha版本的信息,请参阅以下文档: spark.apache.org/docs/latest/... - Murtaza Kanchwala


我有一个类似的问题,他从Kafka来源读书并写信给Cassandra水槽。在这里创建了一个简单的项目 kafka2spark2cassandra,分享,以防它对任何人都有帮助。


3
2018-01-05 20:25



@Sokia - 您的项目运作良好,干净且独立。谢谢。 - SergeyB