RK.
RK.

Reputation: 617

How to write PySpark dataframe to DynamoDB table?

How to write PySpark dataframe to DynamoDB table? Did not find much info on this. As per my requirement, i have to write PySpark dataframe to Dynamo db table. Overall i need to read/write to dynamo from my PySpark code.

Thanks in advance.

Upvotes: 5

Views: 18054

Answers (4)

NNM
NNM

Reputation: 398

we are saving pyspark output to parquet on S3, then using awswrangler layer in lambda to read the parquet data to pandas frame and wrangler.dynamodb.put_df to write the whole dataframe to the dynamoDB table. Pretty decent scaling with lambda concurrency and s3 event trigger

Upvotes: 0

Kunal
Kunal

Reputation: 322

This AWS blog explains how to create a unique key, partition and write S3 data (csv) to DynamoDB table using AWS Glue.

How realtor.com® maximized data upload from Amazon S3 into Amazon DynamoDB

Upvotes: 1

eltbus
eltbus

Reputation: 505

You can use spark-dynamodb.

From their repo:

# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
dynamoDf = spark.read.option("tableName", "SomeTableName") \
                     .format("dynamodb") \
                     .load() # <-- DataFrame of Row objects with inferred schema.

# Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)

# write to some other table overwriting existing item with same keys
dynamoDf.write.option("tableName", "SomeOtherTable") \
              .format("dynamodb") \
              .save()

Upvotes: 3

Tim
Tim

Reputation: 425

Ram, there's no way to do that directly from pyspark. If you have pipeline software running it can be done in a series of steps. Here is how it can be done:

  1. Create a temporary hive table like

    CREATE TABLE TEMP( column1 type, column2 type...) STORED AS ORC;

  2. Run your pySpark job and write your data to it

    dataframe.createOrReplaceTempView("df") spark.sql("INSERT OVERWRITE TABLE temp SELECT * FROM df")

  3. Create the dynamo connector table

    CREATE TABLE TEMPTODYNAMO( column1 type, column2 type...) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "temp-to-dynamo", "dynamodb.column.mapping" = "column1:column1,column2:column2...";

  4. Overwrite that table with your temp table

    INSERT OVERWRITE TABLE TEMPTODYNAMO SELECT * FROM TEMP;

More info here: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/EMR_Hive_Commands.html

Upvotes: 5

Related Questions