Reputation: 617
How to write PySpark dataframe to DynamoDB table? Did not find much info on this. As per my requirement, i have to write PySpark dataframe to Dynamo db table. Overall i need to read/write to dynamo from my PySpark code.
Thanks in advance.
Upvotes: 5
Views: 18054
Reputation: 398
we are saving pyspark output to parquet on S3, then using awswrangler layer in lambda to read the parquet data to pandas frame and wrangler.dynamodb.put_df to write the whole dataframe to the dynamoDB table. Pretty decent scaling with lambda concurrency and s3 event trigger
Upvotes: 0
Reputation: 322
This AWS blog explains how to create a unique key, partition and write S3 data (csv) to DynamoDB table using AWS Glue.
How realtor.com® maximized data upload from Amazon S3 into Amazon DynamoDB
Upvotes: 1
Reputation: 505
You can use spark-dynamodb.
From their repo:
# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
dynamoDf = spark.read.option("tableName", "SomeTableName") \
.format("dynamodb") \
.load() # <-- DataFrame of Row objects with inferred schema.
# Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)
# write to some other table overwriting existing item with same keys
dynamoDf.write.option("tableName", "SomeOtherTable") \
.format("dynamodb") \
.save()
Upvotes: 3
Reputation: 425
Ram, there's no way to do that directly from pyspark. If you have pipeline software running it can be done in a series of steps. Here is how it can be done:
Create a temporary hive table like
CREATE TABLE TEMP(
column1 type,
column2 type...)
STORED AS ORC;
Run your pySpark job and write your data to it
dataframe.createOrReplaceTempView("df")
spark.sql("INSERT OVERWRITE TABLE temp SELECT * FROM df")
Create the dynamo connector table
CREATE TABLE TEMPTODYNAMO(
column1 type,
column2 type...)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "temp-to-dynamo",
"dynamodb.column.mapping" = "column1:column1,column2:column2...";
Overwrite that table with your temp table
INSERT OVERWRITE TABLE TEMPTODYNAMO SELECT * FROM TEMP;
More info here: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/EMR_Hive_Commands.html
Upvotes: 5