Reputation: 145
I want to read and process the docs from a Mongo DB Collection and write the updated collection back to a new collection in Mongo DB.
I am using the below code to load using MongoSpark.
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection1")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection2")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
Once the docs are loaded in instance of JavaMongoRDD I want to process them (select and update the docs) and then finally write the docs to another collection.
I am not sure of the right way to process/apply transformations as required on the 'rdd' instance and finally write the updated document to Mongo DB target collection.
MongoSpark.save(rdd);
Can someone help me on the how I can use Mongo Spark/Spark API to process data loaded from a Mongo DB collection before writing to target collection.
I am using the mongo-spark-connector_2.11 and spark-core_2.11 for this.
Upvotes: 1
Views: 832
Reputation: 3186
Loading data as RDDs
You can pass a JavaSparkContext
or a SQLContext
to the MongoSpark#load
for easy reading from MongoDB into an JavaRDD
// Loading and analyzing data from MongoDB
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
System.out.println(rdd.count());
System.out.println(rdd.first().toJson());
Saving RDD data
When saving RDD
data into MongoDB, it must be a type that can be converted into a Bson document. You may have add a map
step to transform the data into a Document
(or BsonDocument
a DBObject
).
JavaRDD<Document> documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
(new Function<Integer, Document>() {
@Override
public Document call(final Integer i) throws Exception {
return Document.parse("{test: " + i + "}");
}
});
MongoSpark.save(documents);
Upvotes: 1