Reputation: 971
I am attempting to implement a pipeline for reading data from an RDBMS data source, partitioning the read on a datetime field, and storing that data partitioned data in parquet.
The pipeline is intended to be run weekly, with each run simply appending any new rows which have been added to the RDBMS source to the partitioned parquet data.
Currently, the way I'm handling this problem is by:
Storing the previous time of ingest.
Reading from the RDBMS and applying a filter on the datetime column for entries after the previous time of ingest.
Appending this data to the paritioned parquet file.
While this works, I am not sure if it is the most idiomatic way of handling what is likely a very common use case. As well, unless I want to allow row duplication, some additional massaging of the already written data is necessary.
An example of this pipeline would be:
// rdbms is an object which stores various connection information for an RDBMS.
// dateCol is the column name of the datetime column.
// path is the parquet file path.
val yearCol = "year"
val monthCol = "month"
val dayCol = "day"
val refreshDF = spark.read
.format("jdbc")
.option("url", rdbms.connectionString + "/" + rdbms.database)
.option("dbtable", rdbms.table)
.option("user", rdbms.userName)
.option("password", rdbms.password)
.option("driver", rdbms.driverClass)
.option("header", true)
.option("readOnly", true)
.load()
val ts = unix_timestamp(col(dateCol), dateFormat).cast("timestamp")
val unixDateCol = dateCol + "_unix"
val datedDF = refreshDF.withColumn(unixDateCol, ts)
val filteredDF = datedDF.filter(col(unixDateCol).lt(lastRun))
val ymdDF = filteredDF.withColumn(yearCol, year(col(unixDateCol)))
.withColumn(monthCol, month(col(unixDateCol)))
.withColumn(dayCol, day(col(unixDateCol)))
ymdDF.write.mode("append").partitionBy(yearCol, monthCol, dayCol).parquet(path)
Is there a better way to do this? I'd like to avoid reading the entire table and computing a difference for performance reasons.
(Edit: Added in the partitioning, although as this pass doesn't de-duplicate the latest read, it's not leveraged.)
Upvotes: 1
Views: 1915
Reputation: 2938
Instead of reading all the data from DB each time, you can pass timestamp field filters via 'predicates' parameter to DB side for it to only return the data for the data range of your interest. Which is much faster in case of large tables and if the timestamp is indexed and or partitioned on the DB side. Here is the relevant method:
/**
* Construct a `DataFrame` representing the database table accessible via JDBC URL
* url named table using connection properties. The `predicates` parameter gives a list
* expressions suitable for inclusion in WHERE clauses; each one defines one partition
* of the `DataFrame`.
*
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database.
* @param predicates Condition in the where clause for each partition.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included. "fetchsize" can be used to control the
* number of rows per fetch.
* @since 1.4.0
*/
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
As for ensuring the data is not duplicated, you can query the count of records per day from the parquet file for say last couple weeks and find the oldest date, for which there are 0 records as the "previous time of ingest". This would eliminate the chance of this date being out of sync with the parquet data.
Upvotes: 1