GothamGirl
GothamGirl

Reputation: 299

Optimal way to save spark sql dataframe to S3 using information stored in them

I have dataframes with data like :

        channel  eventId1               eventId2               eventTs  eventTs2  serialNumber  someCode
        Web-DTB akefTEdZhXt8EqzLKXNt1Wjg    akTEdZhXt8EqzLKXNt1Wjg  1545502751154   1545502766731   4   rfs
        Web-DTB 3ycLHHrbEkBJ.piYNyI7u55w    3ycLHHEkBJ.piYNyI7u55w  1545502766247   1545502767800   4   njs
        Web-DTB 3ycL4rHHEkBJ.piYNyI7u55w    3ycLHHEkBJ.piYNyI7u55w  1545502766247   1545502767800   4   null

I need to save this data to S3 path looking like :

  s3://test/data/ABC/hb/eventTs/[eventTs]/uploadTime_[eventTs2]/*.json.gz

How can I proceed with this as I need to extract data from the partitions to write to S3 path: (the s3 path is a function of eventTs and eventTs2 present in the dataframes)

df.write.partitionBy("eventTs","eventTs2").format("json").save("s3://test/data/ABC/hb????")

I guess I can iterate over each row in the dataframe , extract the path and save to S3 but do not want to do that.

Is there any way to group by the dataframes on eventTs and eventTs2 and then save the dataframes to the full S3 path? Is there something more optimal?

Upvotes: 1

Views: 1200

Answers (1)

Sudev Ambadi
Sudev Ambadi

Reputation: 655

Spark supports partitions like what we have in Hive. If number of distinct elements for eventTs, eventTs2 is less, partitions will be a good way to solve this.

Check the scala doc for more information around partitionBy.

Example usage:

val someDF = Seq((1, "bat", "marvel"), (2, "mouse", "disney"), (3, "horse", "animal"), (1, "batman", "marvel"), (2, "tom", "disney") ).toDF("id", "name", "place")
someDF.write.partitionBy("id", "name").orc("/tmp/somedf")

If you write the dataframe with the paritionBy on "id" and "name" the following directory structure will be created.

/tmp/somedf/id=1/name=bat
/tmp/somedf/id=1/name=batman

/tmp/somedf/id=2/name=mouse
/tmp/somedf/id=2/name=tom

/tmp/somedf/id=3/name=horse

The first and second partition becomes directories and all rows where id is equal to 1 and name is bat will be saved under the directory structure /tmp/somedf/id=1/name=bat, the order of the partitions defined in partitionBy decides the order of directories.

In your case, the partitions will be on eventTs and eventTS2.

val someDF = Seq(
        ("Web-DTB","akefTEdZhXt8EqzLKXNt1Wjg","akTEdZhXt8EqzLKXNt1Wjg","1545502751154","1545502766731",4,"rfs"),
        ("Web-DTB","3ycLHHrbEkBJ.piYNyI7u55w","3ycLHHEkBJ.piYNyI7u55w","1545502766247","1545502767800",4,"njs"),
        ("Web-DTB","3ycL4rHHEkBJ.piYNyI7u55w","3ycLHHEkBJ.piYNyI7u55w","1545502766247","1545502767800",4,"null"))
    .toDF("channel" , "eventId1", "eventId2", "eventTs",  "eventTs2",  "serialNumber",  "someCode")
someDF.write("eventTs", "eventTs2").orc("/tmp/someDF")

Creating a directory structure as follows.

/tmp/someDF/eventTs=1545502766247/eventTs2=1545502767800
/tmp/someDF/eventTs=1545502751154/eventTs2=1545502766731

Upvotes: 3

Related Questions