Reputation: 299
I have dataframes with data like :
channel eventId1 eventId2 eventTs eventTs2 serialNumber someCode
Web-DTB akefTEdZhXt8EqzLKXNt1Wjg akTEdZhXt8EqzLKXNt1Wjg 1545502751154 1545502766731 4 rfs
Web-DTB 3ycLHHrbEkBJ.piYNyI7u55w 3ycLHHEkBJ.piYNyI7u55w 1545502766247 1545502767800 4 njs
Web-DTB 3ycL4rHHEkBJ.piYNyI7u55w 3ycLHHEkBJ.piYNyI7u55w 1545502766247 1545502767800 4 null
I need to save this data to S3 path looking like :
s3://test/data/ABC/hb/eventTs/[eventTs]/uploadTime_[eventTs2]/*.json.gz
How can I proceed with this as I need to extract data from the partitions to write to S3 path: (the s3 path is a function of eventTs and eventTs2 present in the dataframes)
df.write.partitionBy("eventTs","eventTs2").format("json").save("s3://test/data/ABC/hb????")
I guess I can iterate over each row in the dataframe , extract the path and save to S3 but do not want to do that.
Is there any way to group by the dataframes on eventTs and eventTs2 and then save the dataframes to the full S3 path? Is there something more optimal?
Upvotes: 1
Views: 1200
Reputation: 655
Spark supports partitions like what we have in Hive. If number of distinct elements for eventTs, eventTs2 is less, partitions will be a good way to solve this.
Check the scala doc for more information around partitionBy.
Example usage:
val someDF = Seq((1, "bat", "marvel"), (2, "mouse", "disney"), (3, "horse", "animal"), (1, "batman", "marvel"), (2, "tom", "disney") ).toDF("id", "name", "place")
someDF.write.partitionBy("id", "name").orc("/tmp/somedf")
If you write the dataframe with the paritionBy on "id" and "name" the following directory structure will be created.
/tmp/somedf/id=1/name=bat
/tmp/somedf/id=1/name=batman
/tmp/somedf/id=2/name=mouse
/tmp/somedf/id=2/name=tom
/tmp/somedf/id=3/name=horse
The first and second partition becomes directories and all rows where id is equal to 1 and name is bat will be saved under the directory structure /tmp/somedf/id=1/name=bat
, the order of the partitions defined in partitionBy decides the order of directories.
In your case, the partitions will be on eventTs and eventTS2.
val someDF = Seq(
("Web-DTB","akefTEdZhXt8EqzLKXNt1Wjg","akTEdZhXt8EqzLKXNt1Wjg","1545502751154","1545502766731",4,"rfs"),
("Web-DTB","3ycLHHrbEkBJ.piYNyI7u55w","3ycLHHEkBJ.piYNyI7u55w","1545502766247","1545502767800",4,"njs"),
("Web-DTB","3ycL4rHHEkBJ.piYNyI7u55w","3ycLHHEkBJ.piYNyI7u55w","1545502766247","1545502767800",4,"null"))
.toDF("channel" , "eventId1", "eventId2", "eventTs", "eventTs2", "serialNumber", "someCode")
someDF.write("eventTs", "eventTs2").orc("/tmp/someDF")
Creating a directory structure as follows.
/tmp/someDF/eventTs=1545502766247/eventTs2=1545502767800
/tmp/someDF/eventTs=1545502751154/eventTs2=1545502766731
Upvotes: 3