PATHURI B
PATHURI B

Reputation: 11

Read mongodb document as a string/text using databricks readstream/writestream

I'm trying to read documents from mongodb Into databricks using spark structured streaming .

I'm using to_json() to convert whole document to string . When using this , the schema evolution is working properly . The new fields which have been added to mongodb collection isn't being recognized by to_json() function .

Is there any other way to Insert mongodb document as a text format into databricks ?

Thanks in advance .

This is my code

   query = spark.readStream\
    .format("mongodb")\
    .option("spark.mongodb.connection.uri", connectionString)\
    .option("spark.mongodb.database", database)\
    .option("spark.mongodb.collection", collection)\
    .option("mode","PERMISSIVE")\
    .option("spark.mongodb.outputExtendedJson", "true")\
    .option("spark.mongodb.change.stream.publish.full.document.only", "true")\
    .option("multiline", "true")\
    .option("inferSchema", "true")\
    .option("mergeSchema", "true")\
    .load()

 def batch(micro_batch,id):

    all_col = micro_batch.columns

    result_df = micro_batch\
    .withColumn("json_data", to_json(struct([micro_batch[col] for col in micro_batch.columns])))\
    .withColumn("json_variant",parse_json(col("json_data")))\
    .withColumn("Date_Inserted",current_timestamp())\
    .select("Date_Inserted","_id","json_data","json_variant")

    if result_df.head(1):
        result_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable(destination_table)


query.writeStream \
    .foreachBatch(batch) \
    .option("checkpointLocation", "/Volumes/default/adls_steam/Raw/mongo/v32") \
    .start()\
    .awaitTermination()

Upvotes: 1

Views: 31

Answers (0)

Related Questions