Reputation: 802
Spark dataframe can be written into mongodb collection. Refer - https://docs.mongodb.com/spark-connector/master/python/write-to-mongodb/
But when tried to write spark structure stream into mongodb collection, it is not working.
Can you please suggest any better option to achive this than using pymongo code in udf.
Upvotes: 2
Views: 2248
Reputation: 1005
Sharing an alternative solution where config part is taken care at very beginning rather than handling configs later in save
method (to seperate configs from logic).
def save(message: DataFrame):
message.write \
.format("mongo") \
.mode("append") \
.option("database", "db_name") \
.option("collection", "collection_name") \
.save()
pass
spark: SparkSession = SparkSession \
.builder \
.appName("MyApp") \
.config("spark.mongodb.input.uri", "mongodb://localhost:27017") \
.config("spark.mongodb.output.uri", "mongodb://localhost:27017") \
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
.master("local") \
.getOrCreate()
df: DataFrame = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
query: StreamingQuery = df\
.writeStream \
.foreachBatch(save) \
.start()
query.awaitTermination()
Upvotes: 2
Reputation: 802
it is resolved using foreachBatch
sink. PFB working sample code.
def write_mongo_row(df, epoch_id):
mongoURL = "mongodb://XX.XX.XX.XX:27017/test.collection"
df.write.format("mongo").mode("append").option("uri",mongoURL).save()
pass
query=csvDF.writeStream.foreachBatch(write_mongo_row).start()
query.awaitTermination()
got idea from How to use foreach or foreachBatch in PySpark to write to database?
Upvotes: 2