问题 如何使用spark查询mongo?


我正在使用spark和mongo。我可以使用以下代码连接到mongo:

val sc = new SparkContext("local", "Hello from scala")

val config = new Configuration()
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/dbName.collectionName")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

上面的代码给了我收集的所有文件。

现在我想在查询上应用一些条件。

为此我用过

config.set("mongo.input.query","{customerId: 'some mongo id'}")

这一次只涉及一个条件。如果'usage'> 30,我想添加一个条件

1)如何使用spark和mongo为mongo查询添加多个条件(包括大于和小于)?

另外,我想使用scala迭代查询结果的每个文档?

2)如何使用scala迭代结果?


10985
2017-12-17 10:23


起源

这里有一些侧面标志:Mongo的hadoop格式有资源处理问题,可以保持连接打开。当我们将它与Spark混合时,这是一个爆炸性的组合。 避免 - maasg
@maasg还有其他选项可以连接mongo和spark吗? - Vishwas


答案:


嗨,你可以试试这个:

有一个项目将MongoDB与Spark集成在一起

https://github.com/Stratio/deep-spark/tree/develop

1)做一个git克隆

2)进入深火花,然后进入深亲

3)mvn安装

4)用这个选项打开spark-shell:

./spark-shell --jars YOUR_PATH / deep-core-0.7.0-SNAPSHOT.jar,YOUR_PATH / deep-commons-0.7.0-SNAPSHOT.jar,YOUR_PATH / deep-mongodb-0.7.0-SNAPSHOT.jar, YOUR_PATH /蒙戈的Java驱动程序 - 2.12.4-sources.jar

记得用真实路径覆盖“YOUR_PATH”

5)在spark shell中执行一个简单的例子:

import com.stratio.deep.mongodb.config.MongoDeepJobConfig
import com.stratio.deep.mongodb.extractor.MongoNativeDBObjectExtractor
import com.stratio.deep.core.context.DeepSparkContext
import com.mongodb.DBObject
import org.apache.spark.rdd.RDD
import com.mongodb.QueryBuilder
import com.mongodb.BasicDBObject

val host = "localhost:27017"


val database = "test"

val inputCollection = "input";

val deepContext: DeepSparkContext = new DeepSparkContext(sc)

val inputConfigEntity: MongoDeepJobConfig[DBObject] = new MongoDeepJobConfig[DBObject](classOf[DBObject])


val query: QueryBuilder  = QueryBuilder.start();

query.and("number").greaterThan(27).lessThan(30);


inputConfigEntity.host(host).database(database).collection(inputCollection).filterQuery(query).setExtractorImplClass(classOf[MongoNativeDBObjectExtractor])


val inputRDDEntity: RDD[DBObject] = deepContext.createRDD(inputConfigEntity)

最好的是你可以使用QueryBuilder对象来进行查询

你也可以这样传递一个DBObject:

{ "number" : { "$gt" : 27 , "$lt" : 30}}

如果要迭代,可以使用yourRDD.collect()方法。您也可以使用您的RDD.foreach,但您必须提供一个功能。

还有另一种方法可以将罐子添加到火花中。您可以修改spark-env.sh并将此行放在最后:

CONFDIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
for jar in $(ls $CONFDIR/../lib/*.jar); do
  SPARK_CLASSPATH=$SPARK_CLASSPATH:${jar}
done

在lib文件夹中你放了你的库,就是这样。

免责声明:我目前正在研究Stratio


10
2017-12-17 16:28



该项目已被弃用,不再有效。应该删除这个答案。 - rjurney


1)为了向查询添加条件,只需将它们添加到'mongo.input.query'提供的字典中:

config.set("mongo.input.query","{customerId: 'some mongo id', usage: {'$gt': 30}")

要更好地了解查询的工作方式,请参阅:

http://docs.mongodb.org/manual/tutorial/query-documents/

http://docs.mongodb.org/getting-started/python/query/

2)为了迭代结果你可能想看看引发RDD方法'收集',在这个链接中的更多信息,只需查找collect方法:

http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD


2
2017-08-14 13:07





是的,你可以使用mongodb-spark lib,我在这个Stack Overflow线程中发布了一些例子:

如何通过Spark查询MongoDB以进行地理空间查询


0
2018-01-29 21:32