Reputation: 11
I'm trying to read documents from mongodb Into databricks using spark structured streaming .
I'm using to_json() to convert whole document to string . When using this , the schema evolution is working properly . The new fields which have been added to mongodb collection isn't being recognized by to_json() function .
Is there any other way to Insert mongodb document as a text format into databricks ?
Thanks in advance .
This is my code
query = spark.readStream\
.format("mongodb")\
.option("spark.mongodb.connection.uri", connectionString)\
.option("spark.mongodb.database", database)\
.option("spark.mongodb.collection", collection)\
.option("mode","PERMISSIVE")\
.option("spark.mongodb.outputExtendedJson", "true")\
.option("spark.mongodb.change.stream.publish.full.document.only", "true")\
.option("multiline", "true")\
.option("inferSchema", "true")\
.option("mergeSchema", "true")\
.load()
def batch(micro_batch,id):
all_col = micro_batch.columns
result_df = micro_batch\
.withColumn("json_data", to_json(struct([micro_batch[col] for col in micro_batch.columns])))\
.withColumn("json_variant",parse_json(col("json_data")))\
.withColumn("Date_Inserted",current_timestamp())\
.select("Date_Inserted","_id","json_data","json_variant")
if result_df.head(1):
result_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable(destination_table)
query.writeStream \
.foreachBatch(batch) \
.option("checkpointLocation", "/Volumes/default/adls_steam/Raw/mongo/v32") \
.start()\
.awaitTermination()
Upvotes: 1
Views: 31