Joe
Joe

Reputation: 601

Pyspark dataframe or parquet file to DynamoDB

I want to put a pyspark dataframe or a parquet file into a DynamoDB table

The pyspark dataframe that I have has 30MM rows and 20 columns

Solution 1: using boto3, pandas and Batch writing (Amazon DynamoDB)

With this I read the parquet file and pass it to pandas, then I put row by row into the DynamoDB table, but this is taking too long, is very very slow

import boto3

dynamodb = boto3.resource('dynamodb', region_name='name')

table = dynamodb.Table('DynamoDB_table_name')
with table.batch_writer() as batch:
    for index, row in pandas_dataframe.iterrows():
      batch.put_item(
          Item = {
              'column_name_DynamoDB_table': int(row['column_name_in_pandas_dataframe']),
              ...
          }
      )

Solution 2: using boto3, pyspark and SQL (how-to-write-pyspark-dataframe-to-dynamodb-table)

Here I keep getting an error in the step 3 describe in the solution, a ParseException error, I check in the amazon documentation and I see the code is correct (EMR_Hive_Commands.html), maybe it is not SQL code and that is my error, but if it's not, I don't know which language is

-- Step 1
DROP TABLE IF EXISTS TEMP;
CREATE TABLE TEMP(
        column_name_DynamoDB_table type,
        ... )
 STORED AS ORC; 

--step 2.1
pyspark_dataframe.createOrReplaceTempView("df")


--step 2.2
INSERT INTO temp
    SELECT *
    FROM df


--step 3
CREATE TABLE TEMPTODYNAMO(
        column_name_DynamoDB_table type,
        ... )
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ( "dynamodb.table.name" ="temp-to-dynamo" , 
                "dynamodb.column.mapping" = "col1:column_name_DynamoDB_table,...");

the error I keep getting:

Error in SQL statement: ParseException: 
Operation not allowed: STORED BY(line 22, pos 0)

== SQL ==
CREATE TABLE TEMPTODYNAMO(
        column_name_DynamoDB_table type,
        ...  )
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
^^^
TBLPROPERTIES ( "dynamodb.table.name" ="temp-to-dynamo" , 
                "dynamodb.column.mapping" = "col1:column_name_DynamoDB_table,...")

Solution 3: using boto3, pyspark and com.audienceproject (Spark+DynamoDB)

I did not understand what to put in the code, the python code show in the page is:

# Python
# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
dynamoDf = spark.read.option("tableName", "SomeTableName") \
                     .format("dynamodb") \
                     .load() # <-- DataFrame of Row objects with inferred schema.

# Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)

# write to some other table overwriting existing item with same keys
dynamoDf.write.option("tableName", "SomeOtherTable") \
              .format("dynamodb") \
              .save()

But I really did not get where to put the name of my DynamoDB table and my pyspark dataframe

Update: I tried

pysaprk_dataframe.write.option("tableName", "name_DynamoDB_table") \
                .format("dynamodb") \
                .save()

And got this error:

AnalysisException: TableProvider implementation dynamodb cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead

Regards

Upvotes: 1

Views: 3083

Answers (1)

Ravi Naidu
Ravi Naidu

Reputation: 128

Have tried Solution # 3, with the below code snippet and got it working

code change was to add the mode('append')

dynamoDf.write.mode('append').option("tableName","db_dev_users_v2") \
              .option("region",region) \
              .format("dynamodb") \
              .save()

Upvotes: 1

Related Questions