Reputation: 587
If i understand correctly, i could consider spark dataset as a list of objects of type T
. How can two datasets be joined in a way that parent contains a list of children? But also a child would have the list of its own children...
One approach to this would be to do a groupBy
of the children based on the key, but collect_list
returns only one column and i suppose there is a better way to do this.
Wanted result is basically a dataset (list of customer objects?) of type Customer
, but with additions:
End result would then be something like
case class Customer(customer_id: Int, name: String, address: String, age: Int, invoices: List[Invoices])
case class Invoice(invoice_id: Int, customer_id: Int, invoice_num:String, date: Int, invoice_type: String, items: List[Items])
And to that result i would need to come from the following inputs:
case class Customer(customer_id: Int, name: String, address: String, age: Int)
case class Invoice(invoice_id: Int, customer_id: Int, invoice_num:String, date: Int, invoice_type: String)
case class InvoiceItem(item_id: Int, invoice_id: Int, num_of_items: Int, price: Double, total: Double)
val customers_df = Seq(
(11,"customer1", "address1", 10, "F")
,(12,"customer2", "address2", 20, "M")
,(13,"customer3", "address3", 30, "F")
).toDF("customer_id", "name", "address", "age", "sex")
val customers_ds = customers_df.as[Customer].as("c")
customers_ds.show
val invoices_df = Seq(
(21,11, "10101/1", 20181105, "manual")
,(22,11, "10101/2", 20181105, "manual")
,(23,11, "10101/3", 20181105, "manual")
,(24,12, "10101/4", 20181105, "generated")
,(25,12, "10101/5", 20181105, "pos")
).toDF("invoice_id", "customer_id", "invoice_num", "date", "invoice_type")
val invoices_ds = invoices_df.as[Invoice].as("i")
invoices_ds.show
val invoice_items_df = Seq(
(31, 21, 5, 10.0, 50.0)
,(32, 21, 3, 15.0, 45.0)
,(33, 22, 6, 11.0, 66.0)
,(34, 22, 7, 2.0, 14.0)
,(35, 23, 1, 100.0, 100.0)
,(36, 24, 4, 4.0, 16.0)
).toDF("item_id", "invoice_id", "num_of_items", "price", "total")
val invoice_items_ds = invoice_items_df.as[InvoiceItem].as("ii")
invoice_items_ds.show
In tables it looks like this:
+-----------+---------+--------+---+---+
|customer_id| name| address|age|sex|
+-----------+---------+--------+---+---+
| 11|customer1|address1| 10| F|
| 12|customer2|address2| 20| M|
| 13|customer3|address3| 30| F|
+-----------+---------+--------+---+---+
+----------+-----------+-----------+--------+------------+
|invoice_id|customer_id|invoice_num| date|invoice_type|
+----------+-----------+-----------+--------+------------+
| 21| 11| 10101/1|20181105| manual|
| 22| 11| 10101/2|20181105| manual|
| 23| 11| 10101/3|20181105| manual|
| 24| 12| 10101/4|20181105| generated|
| 25| 12| 10101/5|20181105| pos|
+----------+-----------+-----------+--------+------------+
+-------+----------+------------+-----+-----+
|item_id|invoice_id|num_of_items|price|total|
+-------+----------+------------+-----+-----+
| 31| 21| 5| 10.0| 50.0|
| 32| 21| 3| 15.0| 45.0|
| 33| 22| 6| 11.0| 66.0|
| 34| 22| 7| 2.0| 14.0|
| 35| 23| 1|100.0|100.0|
| 36| 24| 4| 4.0| 16.0|
+-------+----------+------------+-----+-----+
Upvotes: 2
Views: 412
Reputation: 13548
It seems you are trying to read normalized data into a tree of Scala objects. You can certainly do this with Spark but Spark may not be the optimal tool for this. If the data is small-enough to fit in memory, which I assume is true from your question, object-relational mapping (ORM) libraries may be better suited for the job.
If you still want to use Spark, you are on the right path with groupBy
and collect_list
. What you are missing is the struct()
function.
case class Customer(id: Int)
case class Invoice(id: Int, customer_id: Int)
val customers = spark.createDataset(Seq(Customer(1))).as("customers")
val invoices = spark.createDataset(Seq(Invoice(1, 1), Invoice(2, 1)))
case class CombinedCustomer(id: Int, invoices: Option[Seq[Invoice]])
customers
.join(
invoices
.groupBy('customer_id)
.agg(collect_list(struct('*)).as("invoices"))
.withColumnRenamed("customer_id", "id"),
Seq("id"), "left_outer")
.as[CombinedCustomer]
.show
struct('*)
builds a StructType
column from the entire row. You can also pick any columns, e.g., struct('x.as("colA"), 'colB)
.
This produces
+---+----------------+
| id| invoices|
+---+----------------+
| 1|[[1, 1], [2, 1]]|
+---+----------------+
Now, in the case where the customer data is expected not to fit in memory, i.e., using a simple collect
is not an option, there are a number of different strategies you can take.
The simplest, and one you should consider instead of collecting to the driver, requires that independent processing of each customer's data is acceptable. In that case, try using map
and distribute the per-customer processing logic to the workers.
If independent processing by customer is not acceptable the general strategy is as follows:
Aggregate the data into structured rows as needed using the above approach.
Repartition the data to ensure that everything you need for processing is in a single partition.
(optionally) sortWithinPartitions
to ensure that the data within a partition is ordered as you need it.
Use mapPartitions
.
Upvotes: 1
Reputation: 1026
You can use Spark-SQL and have one dataset each for customer, invoices and items. Then you can simply use joins and aggregate functions between these datasets to get desired output.
Spark SQL have very negligible performance difference between sql style and programmatic way.
Upvotes: 0