User3
User3

Reputation: 2535

How to pass records from Kafka to method?

I have a Kafka queue from where I read the data as below:

private static void startKafkaConsumerStream() {

        try {

            System.out.println("Print method: startKafkaConsumerStream");

            Dataset<String> lines = (Dataset<String>) _spark
                    .readStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
                    .option("subscribe", HTTP_FED_VO_TOPIC)
                    .option("startingOffsets", "latest")
                    .load()
                    .selectExpr("CAST(value AS STRING)")
                    .as(Encoders.STRING());

            StreamingQuery query = lines.writeStream()
                    .outputMode("append")
                    .format("console")
                    .start();

            query.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

The Requirement: With the above code, I am able to print records to console however, I am jeopardized as in how do I pass these to a method which will process them.

To do this, I tried looking over in the documentation but could not find anything relevant. As I am a newbie to this, it might sound a bit silly. However I am stuck and would highly appreciate any hints.

Goal of the app The goal of the app is to accept request and send it to Kafka, then in a separate thread a Kafka reader is implemented which is responsible to read and process the request and produce the output to another Kafka queue. I am just implementing this, the architecture is not my idea.

Upvotes: 0

Views: 413

Answers (2)

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

lines is Dataset<String> with the values from Kafka as rows.

how do I pass these to a method which will process them.

Depending on what exactly you'd like to do you could of course use foreach operator or use any other operator or function you could use for a batch Dataset.

You could use withColumn(...) or select or map operators.

In other words, think of Spark Structured Streaming as Spark SQL with streaming Dataset.

Upvotes: 1

dumitru
dumitru

Reputation: 2108

You can use a ForeachWriter[T] on the sink part of kafka streaming application to process each row of your query, like this:

   datasetOfString.write.foreach(new ForeachWriter[String] {

     def open(partitionId: Long, version: Long): Boolean = {
       // open connection
     }

     def process(record: String) = {
       // write string to connection
     }

     def close(errorOrNull: Throwable): Unit = {
       // close the connection
     }
   })

Upvotes: 1

Related Questions