Vijay Innamuri
Vijay Innamuri

Reputation: 4372

How to set the Spark streaming receiver frequency?

My requirement is to process the hourly data of a stock market. i.e, get the data from source once per streaming interval and process it via DStream.

I have implemented a custom receiver to scrap/monitor the website by implementing onStart() and onStop() methods and its working.

Challenges encountered:

Options I tried:

  1. Receiver Thread to sleep for few seconds (equal to streaming interval). In this case data is not the latest data while processing.

enter image description here

enter image description here

class CustomReceiver(interval: Int)
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() {
    new Thread("Website Scrapper") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {

  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    println("Entering receive:" + new Date());
    try {
      while (!isStopped) {
        val scriptsLTP = StockMarket.getLiveStockData()
        for ((script, ltp) <- scriptsLTP) {
          store(script + "," + ltp)
        }
        println("sent data")
        System.out.println("going to sleep:" + new Date());
        Thread.sleep(3600 * 1000);
        System.out.println("awaken from sleep:" + new Date());
      }
      println("Stopped receiving")
      restart("Trying to connect again")
    } catch {
      case t: Throwable =>
        restart("Error receiving data", t)
    }
    println("Exiting receive:" + new Date());
  }
}

How to make the Spark Streaming receiver in sync with DStream processing?

Upvotes: 2

Views: 549

Answers (1)

maasg
maasg

Reputation: 37435

This use case doesn't seem a good fit for Spark Streaming. The interval is long enough to consider this as a regular batch job instead. That way, we can make better use of the cluster resources.

I would rewrite it as a Spark Job by parallelizing the target tickers, using a mapPartitions to use the executors as distributed web scrappers and then process as intended.

Then schedule the Spark job to run each hour with cron or more advanced alternatives, such as Chronos at the exact times wanted.

Upvotes: 0

Related Questions