avikm
avikm

Reputation: 802

How to write spark structure stream into mongodb collection?

Spark dataframe can be written into mongodb collection. Refer - https://docs.mongodb.com/spark-connector/master/python/write-to-mongodb/

But when tried to write spark structure stream into mongodb collection, it is not working.

Can you please suggest any better option to achive this than using pymongo code in udf.

Upvotes: 2

Views: 2248

Answers (2)

Pardeep
Pardeep

Reputation: 1005

Sharing an alternative solution where config part is taken care at very beginning rather than handling configs later in save method (to seperate configs from logic).

def save(message: DataFrame):
    message.write \
        .format("mongo") \
        .mode("append") \
        .option("database", "db_name") \
        .option("collection", "collection_name") \
        .save()
    pass

spark: SparkSession = SparkSession \
    .builder \
    .appName("MyApp") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .master("local") \
    .getOrCreate()

df: DataFrame = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

query: StreamingQuery = df\
    .writeStream \
    .foreachBatch(save) \
    .start()

query.awaitTermination()

Upvotes: 2

avikm
avikm

Reputation: 802

it is resolved using foreachBatch sink. PFB working sample code.

def write_mongo_row(df, epoch_id):
    mongoURL = "mongodb://XX.XX.XX.XX:27017/test.collection"
    df.write.format("mongo").mode("append").option("uri",mongoURL).save()
    pass

query=csvDF.writeStream.foreachBatch(write_mongo_row).start()
query.awaitTermination()

got idea from How to use foreach or foreachBatch in PySpark to write to database?

Upvotes: 2

Related Questions