sds
sds

Reputation: 60014

Spark DataTables: where is partitionBy?

A common Spark processing flow we have is something like this:

Loading:

rdd = sqlContext.parquetFile("mydata/")
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())

Processing by id (this is why we do the partitionBy above!)

rdd.reduceByKey(....)
rdd.join(...)

However, Spark 1.3 changed sqlContext.parquetFile to return DataFrame instead of RDD, and it no longer has the partitionBy, getNumPartitions, and reduceByKey methods.

What do we do now with partitionBy?

We can replace the loading code with something like

rdd = sqlContext.parquetFile("mydata/").rdd
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
df = rdd.map(lambda ...: Row(...)).toDF(???)

and use groupBy instead of reduceByKey.

Is this the right way?

PS. Yes, I understand that partitionBy is not necessary for groupBy et al. However, without a prior partitionBy, each join, groupBy &c may have to do cross-node operations. I am looking for a way to guarantee that all operations requiring grouping by my key will run local.

Upvotes: 3

Views: 1216

Answers (2)

sds
sds

Reputation: 60014

It appears that, since version 1.6, repartition(self, numPartitions, *cols) does what I need:

.. versionchanged:: 1.6

Added optional arguments to specify the partitioning columns. Also made numPartitions optional if partitioning columns are specified.

Upvotes: 1

yjshen
yjshen

Reputation: 6693

Since DataFrame provide us an abstraction of Table and Column over RDD, the most convenient way to manipulate DataFrame is to use these abstraction along with the specific table manipulations methods that DataFrame enables us.

On a DataFrame, we could:

  • transform the table schema with select() \ udf() \ as()
  • filter rows out by filter() or where()
  • fire an aggregation through groupBy() and agg()
  • or other analytic job using sample() \ join() \ union()
  • persist your result using saveAsTable() \ saveAsParquet() \ insertIntoJDBC()

Please refer to Spark SQL and DataFrame Guide for more details.

Therefore, a common job looks like:

val people = sqlContext.parquetFile("...")
val department = sqlContext.parquetFile("...")

people.filter("age > 30")
  .join(department, people("deptId") === department("id"))
  .groupBy(department("name"), "gender")
  .agg(avg(people("salary")), max(people("age")))

And for your specific requirements, this could look like:

val t = sqlContext.parquetFile()
t.filter().select().groupBy().agg()

Upvotes: 0

Related Questions