Reputation: 2535
I have a Kafka queue from where I read the data as below:
private static void startKafkaConsumerStream() {
try {
System.out.println("Print method: startKafkaConsumerStream");
Dataset<String> lines = (Dataset<String>) _spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
.option("subscribe", HTTP_FED_VO_TOPIC)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
StreamingQuery query = lines.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
The Requirement: With the above code, I am able to print records to console however, I am jeopardized as in how do I pass these to a method which will process them.
To do this, I tried looking over in the documentation but could not find anything relevant. As I am a newbie to this, it might sound a bit silly. However I am stuck and would highly appreciate any hints.
Goal of the app The goal of the app is to accept request and send it to Kafka, then in a separate thread a Kafka reader is implemented which is responsible to read and process the request and produce the output to another Kafka queue. I am just implementing this, the architecture is not my idea.
Upvotes: 0
Views: 413
Reputation: 74669
lines
is Dataset<String>
with the values from Kafka as rows.
how do I pass these to a method which will process them.
Depending on what exactly you'd like to do you could of course use foreach
operator or use any other operator or function you could use for a batch Dataset.
You could use withColumn(...)
or select
or map
operators.
In other words, think of Spark Structured Streaming as Spark SQL with streaming Dataset.
Upvotes: 1
Reputation: 2108
You can use a ForeachWriter[T]
on the sink part of kafka streaming application to process each row of your query, like this:
datasetOfString.write.foreach(new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// open connection
}
def process(record: String) = {
// write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
}
})
Upvotes: 1