Reputation: 60014
A common Spark processing flow we have is something like this:
Loading:
rdd = sqlContext.parquetFile("mydata/")
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
Processing by id
(this is why we do the partitionBy
above!)
rdd.reduceByKey(....)
rdd.join(...)
However, Spark 1.3 changed sqlContext.parquetFile
to return DataFrame
instead of RDD
, and it no longer has the partitionBy
, getNumPartitions
, and reduceByKey
methods.
What do we do now with partitionBy
?
We can replace the loading code with something like
rdd = sqlContext.parquetFile("mydata/").rdd
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
df = rdd.map(lambda ...: Row(...)).toDF(???)
and use groupBy
instead of reduceByKey
.
Is this the right way?
PS. Yes, I understand that partitionBy
is not necessary for groupBy
et al. However, without a prior partitionBy
, each join
, groupBy
&c may have to do cross-node operations. I am looking for a way to guarantee that all operations requiring grouping by my key will run local.
Upvotes: 3
Views: 1216
Reputation: 60014
It appears that, since version 1.6, repartition(self, numPartitions, *cols)
does what I need:
.. versionchanged:: 1.6
Added optional arguments to specify the partitioning columns. Also made
numPartitions
optional if partitioning columns are specified.
Upvotes: 1
Reputation: 6693
Since DataFrame
provide us an abstraction of Table and Column over RDD
, the most convenient way to manipulate DataFrame
is to use these abstraction along with the specific table manipulations methods that DataFrame enables us.
On a DataFrame, we could:
select()
\ udf()
\ as()
filter()
or where()
groupBy()
and agg()
sample()
\ join()
\ union()
saveAsTable()
\ saveAsParquet()
\ insertIntoJDBC()
Please refer to Spark SQL and DataFrame Guide for more details.
Therefore, a common job looks like:
val people = sqlContext.parquetFile("...")
val department = sqlContext.parquetFile("...")
people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), "gender")
.agg(avg(people("salary")), max(people("age")))
And for your specific requirements, this could look like:
val t = sqlContext.parquetFile()
t.filter().select().groupBy().agg()
Upvotes: 0