esbej
esbej

Reputation: 27

How to combine 2 different dataframes together?

I have 2 DataFrames:

Users (~29.000.000 entries)

|-- userId: string (nullable = true)

Impressions (~1000 entries)

|-- modules: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- content: array (nullable = true)
|    |    |    |-- element: string (containsNull = true)
|    |    |-- id: string (nullable = true)

I want to walk through all the Users and attach to each User 1 Impression from these ~1000 entries. So actually at each ~1000th User the Impression would be the same, then the loop on the Impressions would start from the beginning and assign the same ~1000 impressions for the next ~1000 users. At the end I want to have a DataFrame with the combined data. Also the Users dataframe could be reused by adding the columns of the Impressions or a newly created one would work also as a result.

You have any ideas, which would be a good solution here?

Upvotes: 0

Views: 234

Answers (2)

Chondrops
Chondrops

Reputation: 758

What I would do is use the old trick of adding a monotically increasing ID to both dataframes, then create a new column on your LARGER dataframe (Users) which contains the modulo of each row's ID and the size of smaller dataframe.

This new column then provides a rolling matching key against the items in the Impressions dataframe.

This is a minimal example (tested) to give you the idea. Obviously this will work if you have 1000 impressions to join against:

var users = Seq("user1", "user2", "user3", "user4", "user5", "user6", "user7", "user8", "user9").toDF("users")
var impressions = Seq("a", "b", "c").toDF("impressions").withColumn("id", monotonically_increasing_id())

var cnt = impressions.count

users=users.withColumn("id", monotonically_increasing_id())
       .withColumn("mod", $"id" mod cnt)
       .join(impressions, $"mod"===impressions("id"))
       .drop("mod")

users.show


+-----+---+-----------+---+
|users| id|impressions| id| 
+-----+---+-----------+---+ 
|user1|  0|          a| 0| 
|user2|  1|          b| 1| 
|user3|  2|          c| 2| 
|user4|  3|          a| 0| 
|user5|  4|          b| 1| 
|user6|  5|          c| 2| 
|user7|  6|          a| 0| 
|user8|  7|          b| 1| 
|user9|  8|          c| 2| 
+-----+---+-----------+---+ 

Upvotes: 1

Martin Senne
Martin Senne

Reputation: 6059

Sketch of idea:

  • Add monotonically increasing id to both dataframes Users and Impressions via

    val indexedUsersDF = usersDf.withColumn("index", monotonicallyIncreasingId) 
    val indexedImpressionsDF = impressionsDf.withColumn("index", monotonicallyIncreasingId) 
    

    (see spark dataframe :how to add a index Column )

  • Determine number of rows in Impressions via count and store as int, e.g.

    val numberOfImpressions = ... 
    
  • Apply UDF to index-column in indexedUsersDF that computes the modulo in a seperate column (e.g. moduloIndex)

    val moduloIndexedUsersDF = indexedUsersDF.select(...)
    
  • Join moduloIndexedUsersDF and indexedImperessionsDF on

    moduloIndexedUsersDF("moduloIndex")===indexedImpressions("index")
    

Upvotes: 0

Related Questions