我碰到 Spark的结构化流媒体,它有一个连续消耗S3存储桶并将处理结果写入MySQL数据库的示例。
// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")
// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
.writeStream.format("jdbc")
.start("jdbc:mysql//...")
如何使用它 Spark Kafka Streaming?
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
有没有办法结合这两个例子而不使用 stream.foreachRDD(rdd => {})
?
有没有办法结合这两个例子而不使用
stream.foreachRDD(rdd => {})
?
还没。 Spark 2.0.0没有Kafka sink支持结构化流。这是一个应该出现的功能 根据Tathagata Das的Spark 2.1.0,Spark Streaming的创建者之一。
这是相关的JIRA问题。
编辑:(6.12.2016)
Kafka 0.10用于结构化流媒体的集成现在 在Spark 2.0.2中支持expiramentaly:
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
有没有办法结合这两个例子而不使用
stream.foreachRDD(rdd => {})
?
还没。 Spark 2.0.0没有Kafka sink支持结构化流。这是一个应该出现的功能 根据Tathagata Das的Spark 2.1.0,Spark Streaming的创建者之一。
这是相关的JIRA问题。
编辑:(6.12.2016)
Kafka 0.10用于结构化流媒体的集成现在 在Spark 2.0.2中支持expiramentaly:
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
我有一个类似的问题,他从Kafka来源读书并写信给Cassandra水槽。在这里创建了一个简单的项目 kafka2spark2cassandra,分享,以防它对任何人都有帮助。