Csaba Faragó
Csaba Faragó

Reputation: 473

Call a function with each element a stream in Databricks

I have a DataFrame stream in Databricks, and I want to perform an action on each element. On the net I found specific purpose methods, like writing it to the console or dumping into memory, but I want to add some business logic, and put some results into Redis.

To be more specific, this is how it would look like in non-stream case:

val someDataFrame = Seq(
  ("key1", "value1"),
  ("key2", "value2"),
  ("key3", "value3"),
  ("key4", "value4")
).toDF()

def someFunction(keyValuePair: (String, String)) = {
  println(keyValuePair)
}

someDataFrame.collect.foreach(r => someFunction((r(0).toString, r(1).toString)))

But if the someDataFrame is not a simple data frame but a stream data frame (indeed coming from Kafka), the error message is this:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Could anyone please help me solving this problem?

Some important notes:

val someData = readSomeExternalData()
if (condition containing keyValuePair and someData) {
  doSomething(keyValuePair);
}

(Question What is the purpose of ForeachWriter in Spark Structured Streaming? does not provide a working example, therefore does not answer my question.)

Upvotes: 1

Views: 2235

Answers (2)

prathiba
prathiba

Reputation: 11

Call simple user defined function foreachbatch in spark streaming.

please try this, it will print 'hello world' for every message from tcp socket

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate()
# Create DataFrame representing the stream of input lines from connection tolocalhost:9999
lines = spark .readStream .format("socket")  .option("host", "localhost") .option("port", 9999) .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)


# Generate running word count
wordCounts = words.groupBy("word").count()
 # Start running the query that prints the running counts to the console
def process_row(df, epoch_id):
    # # Write row to storage
    print('hello world')

query = words.writeStream.foreachBatch(process_row).start()
#query = wordCounts .writeStream .outputMode("complete") .format("console") .start()

query.awaitTermination()

Upvotes: 1

jgoday
jgoday

Reputation: 2836

Here is an example of reading using foreachBatch to save every item to redis using the streaming api.

Related to a previous question (DataFrame to RDD[(String, String)] conversion)

// import spark and spark-redis
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.streaming._
import org.apache.spark.sql.types._

import com.redislabs.provider.redis._

// schema of csv files
val userSchema = new StructType()
    .add("name", "string")
    .add("age", "string")

// create a data stream reader from a dir with csv files
val csvDF = spark
  .readStream
  .format("csv")
  .option("sep", ";")
  .schema(userSchema)
  .load("./data") // directory where the CSV files are 

// redis
val redisConfig = new RedisConfig(new RedisEndpoint("localhost", 6379))
implicit val readWriteConfig: ReadWriteConfig = ReadWriteConfig.Default

csvDF.map(r => (r.getString(0), r.getString(0))) // converts the dataset to a Dataset[(String, String)]
  .writeStream // create a data stream writer
  .foreachBatch((df, _) => sc.toRedisKV(df.rdd)(redisConfig)) // save each batch to redis after converting it to a RDD
  .start // start processing

Upvotes: 2

Related Questions