Vikrant Sonawane
Vikrant Sonawane

Reputation: 207

How to use mongo-spark connector in python

I new to python. I am trying to create a Spark DataFrame from mongo collections. for that I have selected mongo-spark connector link-> https://github.com/mongodb/mongo-spark

I dont how to use this jar/git repo into my python standalone script. I wish to know how can clone the repository so that i can use it in standalone pyspark script on Windows

Upvotes: 1

Views: 5279

Answers (2)

user1464878
user1464878

Reputation:

Check the code below let me know if it helps

hadoop2.7$ mongod --version

db version v3.6.8

hadoop2.7$ bin/pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0

https://github.com/mak705/Spark_NoSql_Connector/blob/master/Mongo_Spark.ipynb

Upvotes: 0

Geewers
Geewers

Reputation: 206

For anyone still struggling with this. What worked for me in the end was the following configuration (Setting up or configuring your mongo-spark-connector):

MongoDb version 3.4.14; Spark version 2.2.1; Scala version 2.11.8;

Jars:

mongo-spark-connector_2.11-2.2.1.jar, mongodb-driver-core-3.4.2.jar, mongo-java-driver-3.4.2.jar, bson-3.4.2.jar,

Using the correct Spark, Scala versions with the correct mongo-spark-connector jar version is obviously key here including all the correct versions of the mongodb-driver-core, bson and mongo-java-driver jars.

I had to inspect the pom.xml on Maven Central of the version of mongo-spark-connector I needed to see which version of mongo-java-driver I needed and then downloaded the corresponding mongodb-driver-core and bson jars subsequently.

Finally test: Scala

spark-shell --conf "spark.mongodb.input.uri=mongodb://<username>:<password>@<server>/<database>.<collection>?readPreference=primaryPreferred" --conf "spark.mongodb.output.uri=mongodb://mongodb://<username>:<password>@<server>/<database>.<collection>" --jars <path to jar>/mongo-spark-connector_2.11-2.2.1.jar,<path to jar>/mongodb-driver-core-3.4.2.jar,<path to jar>/bson-3.4.2.jar,<path to jar>/mongo-java-driver-3.4.2.jar

scala> import com.mongodb.spark._
import com.mongodb.spark._

scala> val rdd = MongoSpark.load(sc)
println(rdd.count)

test pyspark: Example using aggregate pipeline

pyspark --jars <path to jar>/mongo-spark-connector_2.11-2.2.1.jar,<path to jar>/mongodb-driver-core-3.4.2.jar,<path to jar>/bson-3.4.2.jar,<path to jar>/mongo-java-driver-3.4.2.jar

In [1]: stage1="{'$match':{'_id':ObjectId('<SOME UID>')}}"
In [2]: df=spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://<username>:<password>@<server><database>.<collection>").option("pipeline",stage1).load()
In [3]: df.printSchema()

Upvotes: 4

Related Questions