Reputation: 53916
Below Spark code does not appear to perform any operation on file example.txt
val conf = new org.apache.spark.SparkConf()
.set("spark.executor.memory", "2g");
val ssc = new StreamingContext(conf, Seconds(1))
val dataFile: DStream[String] = ssc.textFileStream("C:\\example.txt")
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
I'm attempting to print first 10 elements of file using dataFile.print()
Some of generated output :
15/03/12 12:23:53 INFO JobScheduler: Started JobScheduler
15/03/12 12:23:54 INFO FileInputDStream: Finding new files took 105 ms
15/03/12 12:23:54 INFO FileInputDStream: New files at time 1426163034000 ms:
15/03/12 12:23:54 INFO JobScheduler: Added jobs for time 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Starting job streaming job 1426163034000 ms.0 from job set of time 1426163034000 ms
Time: 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Finished job streaming job 1426163034000 ms.0 from job set of time 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Total delay: 0.157 s for time 1426163034000 ms (execution: 0.006 s)
15/03/12 12:23:54 INFO FileInputDStream: Cleared 0 old files that were older than 1426162974000 ms:
15/03/12 12:23:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/03/12 12:23:55 INFO FileInputDStream: Finding new files took 2 ms
15/03/12 12:23:55 INFO FileInputDStream: New files at time 1426163035000 ms:
15/03/12 12:23:55 INFO JobScheduler: Added jobs for time 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Starting job streaming job 1426163035000 ms.0 from job set of time 1426163035000 ms
Time: 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Finished job streaming job 1426163035000 ms.0 from job set of time 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Total delay: 0.011 s for time 1426163035000 ms (execution: 0.001 s)
15/03/12 12:23:55 INFO MappedRDD: Removing RDD 1 from persistence list
15/03/12 12:23:55 INFO BlockManager: Removing RDD 1
15/03/12 12:23:55 INFO FileInputDStream: Cleared 0 old files that were older than 1426162975000 ms:
15/03/12 12:23:55 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/03/12 12:23:56 INFO FileInputDStream: Finding new files took 3 ms
15/03/12 12:23:56 INFO FileInputDStream: New files at time 1426163036000 ms:
is of format :
As the print
documentation states :
/** * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */
Does this mean 0 RDD have been generated for this stream ? Using Apache Spark if want to see contents of RDD would use collect function of RDD. Is these similar method for Streams ? So in short how to print to console contents of Stream ?
Update :
Updated code based on @0x0FFF comment. does not appear to give an example reading from local file system. Is this not as common as using Spark core, where there are explicit examples for reading data from file?
Here is updated code :
val conf = new org.apache.spark.SparkConf()
.set("spark.executor.memory", "2g");
val ssc = new StreamingContext(conf, Seconds(1))
val dataFile: DStream[String] = ssc.textFileStream("file:///c:/data/")
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
But output is same. When I add new files to c:\\data
dir (which have same format as existing data files) they are not processed. I assume dataFile.print
should print first 10 lines to console ?
Update 2 :
Perhaps this is related to fact that I'm running this code in Windows environment?
Upvotes: 4
Views: 13075
Reputation: 1
I probably found your issue, you should have this in your log :
WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
The problem is that you need to have at least 2 cores to run a spark streaming app. So the solution should be to simply replace:
val conf = new org.apache.spark.SparkConf()
By :
val conf = new org.apache.spark.SparkConf()
Or at least more than one.
Upvotes: 0
Reputation: 53916
Here is a custom receiver I wrote that listens for data at a specified dir :
package receivers
import org.apache.spark.{ SparkConf, Logging }
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.receiver.Receiver
class CustomReceiver(dir: String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("File Receiver") {
override def run() { receive() }
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
def recursiveListFiles(f: File): Array[File] = {
val these = f.listFiles
these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
private def receive() {
for (f <- recursiveListFiles(new File(dir))) {
val source =
val lines = source.getLines
logInfo("Stopped receiving")
restart("Trying to connect again")
One thing I think to be aware of is that the the files need to be processed in a time that is <= configured batchDuration
. In example below it's set to 10 seconds but if time to process files by receiver exceeds 10 seconds then some data files will not be processed. I'm open to correction on this point.
Here is how the custom receiver is implemented :
val conf = new org.apache.spark.SparkConf()
.set("spark.executor.memory", "2g");
val ssc = new StreamingContext(conf, Seconds(10))
val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomReceiver("C:\\data\\"))
customReceiverStream.foreachRDD(m => {
println("size is " + m.collect.size)
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
More info at : &
Upvotes: 0
Reputation: 5018
You misunderstood the use of textFileStream
. Here is its description from Spark documentation:
Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat).
So first, you should pass it the directory, and second, this directory should be available from the node running the receiver, so it is better to use HDFS for this purpose. Then when you put a new file into this directory, it would be processed by the function print()
and first 10 lines would be printed for it
My code:
[alex@sparkdemo tmp]$ pyspark --master local[2]
Python 2.6.6 (r266:84292, Nov 22 2013, 12:16:22)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
s15/03/12 06:37:49 WARN Utils: Your hostname, sparkdemo resolves to a loopback address:; using instead (on interface eth0)
15/03/12 06:37:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0
Using Python version 2.6.6 (r266:84292, Nov 22 2013 12:16:22)
SparkContext available as sc.
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 30)
>>> dataFile = ssc.textFileStream('file:///tmp')
>>> dataFile.pprint()
>>> ssc.start()
>>> ssc.awaitTermination()
Time: 2015-03-12 06:40:30
Time: 2015-03-12 06:41:00
Time: 2015-03-12 06:41:30
1 2 3
4 5 6
7 8 9
Time: 2015-03-12 06:42:00
Upvotes: 4