Reputation: 27
I have 2 DataFrames:
Users (~29.000.000 entries)
|-- userId: string (nullable = true)
Impressions (~1000 entries)
|-- modules: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- content: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- id: string (nullable = true)
I want to walk through all the Users and attach to each User 1 Impression from these ~1000 entries. So actually at each ~1000th User the Impression would be the same, then the loop on the Impressions would start from the beginning and assign the same ~1000 impressions for the next ~1000 users. At the end I want to have a DataFrame with the combined data. Also the Users dataframe could be reused by adding the columns of the Impressions or a newly created one would work also as a result.
You have any ideas, which would be a good solution here?
Upvotes: 0
Views: 234
Reputation: 758
What I would do is use the old trick of adding a monotically increasing ID to both dataframes, then create a new column on your LARGER dataframe (Users) which contains the modulo of each row's ID and the size of smaller dataframe.
This new column then provides a rolling matching key against the items in the Impressions dataframe.
This is a minimal example (tested) to give you the idea. Obviously this will work if you have 1000 impressions to join against:
var users = Seq("user1", "user2", "user3", "user4", "user5", "user6", "user7", "user8", "user9").toDF("users")
var impressions = Seq("a", "b", "c").toDF("impressions").withColumn("id", monotonically_increasing_id())
var cnt = impressions.count
users=users.withColumn("id", monotonically_increasing_id())
.withColumn("mod", $"id" mod cnt)
.join(impressions, $"mod"===impressions("id"))
.drop("mod")
users.show
+-----+---+-----------+---+
|users| id|impressions| id|
+-----+---+-----------+---+
|user1| 0| a| 0|
|user2| 1| b| 1|
|user3| 2| c| 2|
|user4| 3| a| 0|
|user5| 4| b| 1|
|user6| 5| c| 2|
|user7| 6| a| 0|
|user8| 7| b| 1|
|user9| 8| c| 2|
+-----+---+-----------+---+
Upvotes: 1
Reputation: 6059
Sketch of idea:
Add monotonically increasing id to both dataframes Users and Impressions via
val indexedUsersDF = usersDf.withColumn("index", monotonicallyIncreasingId)
val indexedImpressionsDF = impressionsDf.withColumn("index", monotonicallyIncreasingId)
Determine number of rows in Impressions via count
and store as int, e.g.
val numberOfImpressions = ...
Apply UDF to index-column in indexedUsersDF
that computes the modulo in a seperate column (e.g. moduloIndex)
val moduloIndexedUsersDF = indexedUsersDF.select(...)
Join moduloIndexedUsersDF
and indexedImperessionsDF
on
moduloIndexedUsersDF("moduloIndex")===indexedImpressions("index")
Upvotes: 0