Reputation: 4372
My requirement is to process the hourly data of a stock market. i.e, get the data from source once per streaming interval and process it via DStream.
I have implemented a custom receiver to scrap/monitor the website by implementing onStart() and onStop() methods and its working.
Challenges encountered:
Options I tried:
class CustomReceiver(interval: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart() {
new Thread("Website Scrapper") {
override def run() { receive() }
}.start()
}
def onStop() {
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
println("Entering receive:" + new Date());
try {
while (!isStopped) {
val scriptsLTP = StockMarket.getLiveStockData()
for ((script, ltp) <- scriptsLTP) {
store(script + "," + ltp)
}
println("sent data")
System.out.println("going to sleep:" + new Date());
Thread.sleep(3600 * 1000);
System.out.println("awaken from sleep:" + new Date());
}
println("Stopped receiving")
restart("Trying to connect again")
} catch {
case t: Throwable =>
restart("Error receiving data", t)
}
println("Exiting receive:" + new Date());
}
}
How to make the Spark Streaming receiver in sync with DStream processing?
Upvotes: 2
Views: 549
Reputation: 37435
This use case doesn't seem a good fit for Spark Streaming. The interval is long enough to consider this as a regular batch job instead. That way, we can make better use of the cluster resources.
I would rewrite it as a Spark Job by parallelizing the target tickers, using a mapPartitions
to use the executors as distributed web scrappers and then process as intended.
Then schedule the Spark job to run each hour with cron
or more advanced alternatives, such as Chronos at the exact times wanted.
Upvotes: 0