Reputation: 394
I want to unit test code that read DataFrame from RDBMS using sparkSession.read.jdbc(...)
. But I did't find a way how to mock DataFrameReader to return dummy DataFrame for test.
Code example:
object ConfigurationLoader {
def readTable(tableName: String)(implicit spark: SparkSession): DataFrame = {
spark.read
.format("jdbc")
.option("url", s"$postgresUrl/$postgresDatabase")
.option("dbtable", tableName)
.option("user", postgresUsername)
.option("password", postgresPassword)
.option("driver", postgresDriver)
.load()
}
def loadUsingFilter(dummyFilter: String*)(implicit spark: SparkSession): DataFrame = {
readTable(postgresFilesTableName)
.where(col("column").isin(fileTypes: _*))
}
}
And second problem - to mock scala object, looks like I need to use other approach to create such service.
Upvotes: 5
Views: 10905
Reputation: 1353
Write UT for all DataFrameWriter, DataFrameReader, DataStreamReader, DataStreamWriter
The sample test case using the above steps
Maven based dependencies
<groupId>org.scalatestplus</groupId>
<artifactId>mockito-3-4_2.11</artifactId>
<version>3.2.3.0</version>
<scope>test</scope>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>2.13.0</version>
<scope>test</scope>
Let’s use an example of a spark class where the source is Hive and the sink is JDBC
class DummySource extends SparkPipeline {
/**
* Method to read the source and create a Dataframe
*
* @param sparkSession : SparkSession
* @return : DataFrame
*/
override def read(spark: SparkSession): DataFrame = {
spark.read.table("Table_Name").filter("_2 > 1")
}
/**
* Method to transform the dataframe
*
* @param df : DataFrame
* @return : DataFrame
*/
override def transform(df: DataFrame): DataFrame = ???
/**
* Method to write/save the Dataframe to a target
*
* @param df : DataFrame
*
*/
override def write(df: DataFrame): Unit =
df.write.jdbc("url", "targetTableName", new Properties())
}
Mocking Read
test("Spark read table") {
val dummySource = new DummySource()
val sparkSession = SparkSession
.builder()
.master("local[*]")
.appName("mocking spark test")
.getOrCreate()
val testData = Seq(("one", 1), ("two", 2))
val df = sparkSession.createDataFrame(testData)
df.show()
val mockDataFrameReader = mock[DataFrameReader]
val mockSpark = mock[SparkSession]
when(mockSpark.read).thenReturn(mockDataFrameReader)
when(mockDataFrameReader.table("Table_Name")).thenReturn(df)
dummySource.read(mockSpark).count() should be(1)
}
Mocking Write
test("Spark write") {
val dummySource = new DummySource()
val mockDf = mock[DataFrame]
val mockDataFrameWriter = mock[DataFrameWriter[Row]]
when(mockDf.write).thenReturn(mockDataFrameWriter)
when(mockDataFrameWriter.mode(SaveMode.Append)).thenReturn(mockDataFrameWriter)
doNothing().when(mockDataFrameWriter).jdbc("url", "targetTableName", new Properties())
dummySource.write(df = mockDf)
}
Streaming code in ref
Upvotes: 0
Reputation: 10406
In my opinion, unit tests are not meant to test database connections. This should be done in integration tests to check that all the parts work together. Unit tests are just meant to test your functional logic, and not spark's ability to read from a database.
This is why I would design your code slightly differently and do just that, without caring about the DB.
/** This, I don't test. I trust spark.read */
def readTable(tableName: String)(implicit spark: SparkSession): DataFrame = {
spark.read
.option(...)
...
.load()
// Nothing more
}
/** This I test, this is my logic. */
def transform(df : DataFrame, dummyFilter: String*): DataFrame = {
df
.where(col("column").isin(fileTypes: _*))
}
Then I use the code this way in production.
val source = readTable("...")
val result = transform(source, filter)
And now transform
, that contains my logic, is easy to test. In case you wonder how to create dummy dataframes, one way I like is this:
val df = Seq((1, Some("a"), true), (2, Some("b"), false),
(3, None, true)).toDF("x", "y", "z")
// and the test
val result = transform(df, filter)
result should be ...
Upvotes: 5
Reputation: 2033
If you want to test sparkSession.read.jdbc(...)
, you can play with in-memory H2 database. I do it sometimes when I'm writing learning tests. You can find an example here: https://github.com/bartosz25/spark-scala-playground/blob/d3cad26ff236ae78884bdeb300f2e59a616dc479/src/test/scala/com/waitingforcode/sql/LoadingDataTest.scala Please note however that you may encounter some subtle differences with "real" RDBMS.
On the other side, you can better separate the concerns of the code and create the DataFrame
differently, for instance with toDF(...)
method. You can find an example here: https://github.com/bartosz25/spark-scala-playground/blob/77ea416d2493324ddd6f3f2be42122855596d238/src/test/scala/com/waitingforcode/sql/CorrelatedSubqueryTest.scala
Finally and IMO, if you have to mock DataFrameReader
, it means that maybe there is something to do with the code separation. For instance, you can put all your filters inside a Filters
object and test each filter separately. Same for mapping or aggregation functions. 2 years ago I wrote a blog post about testing Apache Spark - https://www.waitingforcode.com/apache-spark/testing-spark-applications/read It describes RDD API but the idea of separating concerns is the same.
Updated:
object Filters {
def isInFileTypes(inputDataFrame: DataFrame, fileTypes: Seq[String]): DataFrame = {
inputDataFrame.where(col("column").isin(fileTypes: _*))
}
}
object ConfigurationLoader {
def readTable(tableName: String)(implicit spark: SparkSession): DataFrame = {
val input = spark.read
.format("jdbc")
.option("url", s"$postgresUrl/$postgresDatabase")
.option("dbtable", tableName)
.option("user", postgresUsername)
.option("password", postgresPassword)
.option("driver", postgresDriver)
.load()
Filters.isInFileTypes(input, Seq("txt", "doc")
}
And with that you can test your filtering logic whatever you want :) If you have more filters and want to test them, you can also combine them in a single method, pass any DataFrame
you want and voilà :)
You shouldn't test the .load()
unless you have very good reasons to do so. It's Apache Spark internal logic, already tested.
Update, answer for:
So, now I am able to test filters, but how to make sure that readTable really use proper filter(sorry for thoroughness, it is just question of full coverage). Probably you have some simple approach how to mock scala object(it is actually mu second problem). – dytyniak 14 mins ago
object MyApp {
def main(args: Array[String]): Unit = {
val inputDataFrame = readTable(postgreSQLConnection)
val outputDataFrame = ProcessingLogic.generateOutputDataFrame(inputDataFrame)
}
}
object ProcessingLogic {
def generateOutputDataFrame(inputDataFrame: DataFrame): DataFrame = {
// Here you apply all needed filters, transformations & co
}
}
As you can see, no need to mock an object
here. It seems redundant but it's not because you can test every filter in isolation thanks to Filters
object and all your processing logic combined thanks to ProcessingLogic
object (name only for example). And you can create your DataFrame
in any valid way. The drawback is you will need define a schema explicitly or use case classes
since in your PostgreSQL source, Apache Spark will resolve the schema automatically (I explained this here: https://www.waitingforcode.com/apache-spark-sql/schema-projection/read).
Upvotes: 2