Kyle Schmitt
Kyle Schmitt

Reputation: 31

Spark Structured Streaming Multiple Kafka Topics With Unique Message Schemas

Current State:

Today I have built a Spark Structured Streaming application which consumes a single Kafka topic which contain JSON messages. Embedded within the Kafka topic's value contains some information about the source and the schema of the message field. A very simplified version of the message looks something like this:

{
  "source": "Application A",
  "schema": [{"col_name": "countryId", "col_type": "Integer"}, {"col_name": "name", "col_type": "String"}],
  "message": {"countryId": "21", "name": "Poland"}
}

There are a handful of Kafka topics in the system today, and I've deployed this Spark Structured Streaming application per topic, using the subscribe option. The application applies the topic's unique schema (hacked by batch reading the first message in the Kafka topic and mapping the schema) and writes it to HDFS in parquet format.

Desired State:

My organization will soon start producing more and more topics and I don't think this pattern of a Spark Application per topic will scale well. Initially it seems that the subscribePattern option would work well for me, as these topics somewhat have a form of hierarchy, but now I'm stuck on applying the schema and writing to distinct locations in HDFS.

In the future we will most likely have thousands of topics and hopefully only 25 or so Spark Applications.

Does anyone have advice on how to accomplish this?

Upvotes: 3

Views: 2849

Answers (2)

Peter Dowdy
Peter Dowdy

Reputation: 439

If you are running Kafka 0.11+, consider using the headers functionality. Headers will come across as a MapType, and you can then route messages based on their header without having to parse the body first.

Upvotes: 1

bp2010
bp2010

Reputation: 2472

When sending these events with your kafka producer, you could also send a key as well as the value. If every event had it's event type as the key, when reading the stream from the topic(s), you could also get the key:

val kafkaKvPair = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)] 

Then you could just filter on which events you want to process:

val events = kafkaKvPair
  .filter(f => f._1 == "MY_EVENT_TYPE")

In this way if you are subscribed to multiple topics within one Spark app, you could process as many event types as you wish.

Upvotes: 2

Related Questions