Jbaur
Jbaur

Reputation: 145

Process Mongo DB docs loaded using MongoSpark Connector

I want to read and process the docs from a Mongo DB Collection and write the updated collection back to a new collection in Mongo DB.

I am using the below code to load using MongoSpark.

 SparkSession spark = SparkSession.builder()
      .master("local")
      .appName("MongoSparkConnectorIntro")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection1")
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection2")
      .getOrCreate();

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

    JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);

Once the docs are loaded in instance of JavaMongoRDD I want to process them (select and update the docs) and then finally write the docs to another collection.

I am not sure of the right way to process/apply transformations as required on the 'rdd' instance and finally write the updated document to Mongo DB target collection.

MongoSpark.save(rdd);

Can someone help me on the how I can use Mongo Spark/Spark API to process data loaded from a Mongo DB collection before writing to target collection.

I am using the mongo-spark-connector_2.11 and spark-core_2.11 for this.

Upvotes: 1

Views: 832

Answers (1)

Maximilien Belinga
Maximilien Belinga

Reputation: 3186

Loading data as RDDs

You can pass a JavaSparkContext or a SQLContext to the MongoSpark#load for easy reading from MongoDB into an JavaRDD

// Loading and analyzing data from MongoDB
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
System.out.println(rdd.count());
System.out.println(rdd.first().toJson());

Saving RDD data

When saving RDD data into MongoDB, it must be a type that can be converted into a Bson document. You may have add a map step to transform the data into a Document (or BsonDocument a DBObject).

JavaRDD<Document> documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
        (new Function<Integer, Document>() {
    @Override
    public Document call(final Integer i) throws Exception {
        return Document.parse("{test: " + i + "}");
    }
});

MongoSpark.save(documents);

Upvotes: 1

Related Questions