Dave Kincaid
Dave Kincaid

Reputation: 4160

How to repartition a data frame in sparklyr

This is proving difficult to find for some reason. I can easily find the repartition function in pyspark and in sparkr, but no such function seems to exist in sparklyr.

Does anyone know how to repartition a Spark dataframe in sparklyr?

Upvotes: 2

Views: 2780

Answers (2)

kevinykuo
kevinykuo

Reputation: 4762

Now you can use sdf_repartition(), e.g.

iris_tbl %>%
  sdf_repartition(5L, columns = c("Species", "Petal_Width")) %>%
  spark_dataframe() %>%
  invoke("queryExecution") %>%
  invoke("optimizedPlan") 
# <jobj[139]>
#   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
# RepartitionByExpression [Species#14, Petal_Width#13], 5
#                          +- InMemoryRelation [Sepal_Length#10, Sepal_Width#11, Petal_Length#12, Petal_Width#13, Species#14], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
#                                               +- *FileScan csv [Sepal_Length#10,Sepal_Width#11,Petal_Length#12,Petal_Width#13,Species#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/var/folders/ry/_l__tbl57d940bk2kgj8q2nj3s_d9b/T/Rtmpjgtnl6/spark_serializ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>

Upvotes: 6

zero323
zero323

Reputation: 330063

You can try something like this

library(dplyr)
library(stringi)


#' @param df tbl_spark
#' @param numPartitions numeric number of partitions
#' @param ... character column names
repartition <- function(df, numPartitions, ...) {
  # Create output name
  alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep="_")

  # Convert to Spark DataFrame
  sdf <- df %>% spark_dataframe

  # Convert names to Columns
  exprs <- lapply(
    list(...),
    function(x) invoke(sdf, "apply", x)
  )

  sdf %>% 
    invoke("repartition", as.integer(numPartitions), exprs) %>%
    # Use "registerTempTable" with Spark 1.x
    invoke("createOrReplaceTempView", alias)

  tbl(sc, alias)
}

Example usage:

df <- copy_to(sc, iris)

repartition(df, 3, "Species") %>% optimizedPlan

## <jobj[182]>
##   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
##   RepartitionByExpression [Species#775], 3
## +- InMemoryRelation [Sepal_Length#771, Sepal_Width#772, Petal_Length#773, Petal_Width#774, Species#775], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
##    :  +- *Scan csv [Sepal_Length#771,Sepal_Width#772,Petal_Length#773,Petal_Width#774,Species#775] Format: CSV, InputPaths: file:/tmp/Rtmpp150bt/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>

repartition(df, 7) %>% optimizedPlan
## <jobj[69]>
##   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
##   RepartitionByExpression 7
## +- InMemoryRelation [Sepal_Length#19, Sepal_Width#20, Petal_Length#21, Petal_Width#22, Species#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
##    :  +- *Scan csv [Sepal_Length#19,Sepal_Width#20,Petal_Length#21,Petal_Width#22,Species#23] Format: CSV, InputPaths: file:/tmp/RtmpSw6aPg/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>

Function optimizedPlan as defined in Sparklyr: how to center a Spark table based on column?

Upvotes: 2

Related Questions