Reputation: 121077
I have some data in a database, and I want to work with it in Spark, using sparklyr.
I can use a DBI-based package to import the data from the database into R
dbconn <- dbConnect(<some connection args>)
data_in_r <- dbReadTable(dbconn, "a table")
then copy the data from R to Spark using
sconn <- spark_connect(<some connection args>)
data_ptr <- copy_to(sconn, data_in_r)
Copying twice is slow for big datasets.
How can I copy data directly from the database into Spark?
sparklyr has several spark_read_*()
functions for import, but nothing database related. sdf_import()
looks like a possibility, but it isn't clear how to use it in this context.
Upvotes: 4
Views: 2858
Reputation: 330203
Sparklyr >= 0.6.0
You can use spark_read_jdbc
.
Sparklyr < 0.6.0
I hope there is a more elegant solution out there but here is a minimal example using low level API:
Make sure that Spark has access to the required JDBC driver, for example by adding its coordinates to spark.jars.packages
. For example with PostgreSQL (adjust for current version) you could add:
spark.jars.packages org.postgresql:postgresql:9.4.1212
to SPARK_HOME/conf/spark-defaults.conf
Load data and register as temporary view:
name <- "foo"
spark_session(sc) %>%
invoke("read") %>%
# JDBC URL and table name
invoke("option", "url", "jdbc:postgresql://host/database") %>%
invoke("option", "dbtable", "table") %>%
# Add optional credentials
invoke("option", "user", "scott") %>%
invoke("option", "password", "tiger") %>%
# Driver class, here for PostgreSQL
invoke("option", "driver", "org.postgresql.Driver") %>%
# Read and register as a temporary view
invoke("format", "jdbc") %>%
invoke("load") %>%
# Spark 2.x, registerTempTable in 1.x
invoke("createOrReplaceTempView", name)
You can pass multiple options
at once using an environment
:
invoke("options", as.environment(list(
user="scott", password="tiger", url="jdbc:..."
)))
Load temporary view with dplyr
:
dplyr::tbl(sc, name)
Be sure to read about further JDBC options, with focus on partitionColumn
, *Bound
and numPartitions
.
For additional details see for example How to use JDBC source to write and read data in (Py)Spark? and How to improve performance for slow Spark jobs using DataFrame and JDBC connection?
Upvotes: 6