AJm
AJm

Reputation: 1023

How to Test Spark RDD

I am not sure whether we can Test RDD's in Spark.

I came across an article where it says Mocking a RDD is not a good idea. Is there any other way or any best practice for Testing RDD's

Upvotes: 6

Views: 4243

Answers (2)

himanshuIIITian
himanshuIIITian

Reputation: 6085

There are 2 methods of testing Spark RDD/Applications. They are as follows:

For example:

Unit to Test:

import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 

class WordCount { 
  def get(url: String, sc: SparkContext): RDD[(String, Int)] = { 
    val lines = sc.textFile(url) lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) 
  } 
}

Now Method 1 to Test it is as follows:

import org.scalatest.{ BeforeAndAfterAll, FunSuite }
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

class WordCountTest extends FunSuite with BeforeAndAfterAll { 
  private var sparkConf: SparkConf = _ 
  private var sc: SparkContext = _ 

  override def beforeAll() { 
    sparkConf = new SparkConf().setAppName("unit-testing").setMaster("local") 
    sc = new SparkContext(sparkConf) 
  } 

  private val wordCount = new WordCount 

  test("get word count rdd") { 
    val result = wordCount.get("file.txt", sc) 
    assert(result.take(10).length === 10)
   } 

  override def afterAll() { 
    sc.stop() 
  } 
}

In Method 1 we are not mocking RDD. We are just checking the behavior of our WordCount class. But here we have to manage the creation and destruction of SparkContext on our own. So, if you do not want to write extra code for that, then you can use spark-testing-base, like this:

Method 2:

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 

class WordCountTest extends FunSuite with SharedSparkContext { 
  private val wordCount = new WordCount 

  test("get word count rdd") { 
    val result = wordCount.get("file.txt", sc)
    assert(result.take(10).length === 10) 
  } 
}

Or

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 
import com.holdenkarau.spark.testing.RDDComparisons 

class WordCountTest extends FunSuite with SharedSparkContext with RDDComparisons { 
  private val wordCount = new WordCount 

  test("get word count rdd with comparison") { 
    val expected = sc.textFile("file.txt")
                     .flatMap(_.split(" "))
                     .map((_, 1))
                     .reduceByKey(_ + _) 

    val result = wordCount.get("file.txt", sc)

    assert(compareRDD(expected, result).isEmpty)
   } 
}

For more details on Spark RDD Testing refer this - KnolX: Unit Testing of Spark Applications

Upvotes: 2

Vidya
Vidya

Reputation: 30310

Thank you for putting this outstanding question out there. For some reason, when it comes to Spark, everyone gets so caught up in the analytics that they forget about the great software engineering practices that emerged the last 15 years or so. This is why we make it a point to discuss testing and continuous integration (among other things like DevOps) in our course.

A Quick Aside on Terminology

Before I go on, I have to express a minor disagreement with the KnolX presentation @himanshuIIITian cites. A true unit test means you have complete control over every component in the test. There can be no interaction with databases, REST calls, file systems, or even the system clock; everything has to be "doubled" (e.g. mocked, stubbed, etc) as Gerard Mezaros puts it in xUnit Test Patterns. I know this seems like semantics, but it really matters. Failing to understand this is one major reason why you see intermittent test failures in continuous integration.

We Can Still Unit Test

So given this understanding, unit testing an RDD is impossible. However, there is still a place for unit testing when developing analytics.

(Note: I will be using Scala for the examples, but the concepts transcend languages and frameworks.)

Consider a simple operation:

rdd.map(foo).map(bar)

Here foo and bar are simple functions. Those can be unit tested in the normal way, and they should be with as many corner cases as you can muster. After all, why do they care where they are getting their inputs from whether it is a test fixture or an RDD?

Don't Forget the Spark Shell

This isn't testing per se, but in these early stages you also should be experimenting in the Spark shell to figure out your transformations and especially the consequences of your approach. For example, you can examine physical and logical query plans, partitioning strategy and preservation, and the state of your data with many different functions like toDebugString, explain, glom, show, printSchema, and so on. I will let you explore those.

You can also set your master to local[2] in the Spark shell and in your tests to identify any problems that may only arise once you start to distribute work.

Integration Testing with Spark

Now for the fun stuff.

In order to integration test Spark after you feel confident in the quality of your helper functions and RDD/DataFrame transformation logic, it is critical to do a few things (regardless of build tool and test framework):

  • Increase JVM memory.
  • Enable forking but disable parallel execution.
  • Use your test framework to accumulate your Spark integration tests into suites, and initialize the SparkContext before all tests and stop it after all tests.

There are several ways to do this last one. One is available from the spark-testing-base cited by both @Pushkr and the KnolX presentation linked by @himanshuIIITian.

The Loan Pattern

Another approach is to use the Loan Pattern.

For example (using ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

As you can see, the Loan Pattern makes use of higher-order functions to "loan" the SparkContext to the test and then to dispose of it after it's done.

Suffering-Oriented Programming (Thanks, Nathan)

It is totally a matter of preference, but I prefer to use the Loan Pattern and wire things up myself as long as I can before bringing in another framework. Aside from just trying to stay lightweight, frameworks sometimes add a lot of "magic" that makes debugging test failures hard to reason about. So I take a Suffering-Oriented Programming approach--where I avoid adding a new framework until the pain of not having it is too much to bear. But again, this is up to you.

Now one place where spark-testing-base really shines is with the Hadoop-based helpers like HDFSClusterLike and YARNClusterLike. Mixing those traits in can really save you a lot of setup pain. Another place where it shines is with the Scalacheck-like properties and generators. But again, I would personally hold off on using it until my analytics and my tests reach that level of sophistication.

Integration Testing with Spark Streaming

Finally, I would just like to present a snippet of what a SparkStreaming integration test setup with in-memory values would look like:

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

This is simpler than it looks. It really just turns a sequence of data into a queue to feed to the DStream. Most of it is really just boilerplate setup that works with the Spark APIs.

This might be my longest post ever, so I will leave it here. I hope others chime in with other ideas to help improve the quality of our analytics with the same agile software engineering practices that have improved all other application development.

And with apologies for the shameless plug, you can check out our course Analytics with Apache Spark, where we address a lot of these ideas and more. We hope to have an online version soon.

Upvotes: 7

Related Questions