Sergey Postument
Sergey Postument

Reputation: 177

Flink use multiple data classes for a single source

Some code:

implicit val formats = Serialization.formats(NoTypeHints)

case class DataClass(id: String, name: String)    

val dataSource = env
      .addSource(new FlinkKinesisConsumer[String](s"data-stream-$stage", new SimpleStringSchema, consumerConfig))
      .uid(s"data-stream-$stage-source-id").name("dataSource")
      .map(json => read[DataClass](json))

Here I am taking data from kinesis stream and do serialization into my data class. Everything works fine, but now there is a need to add the ability to receive data in one more additional format (e.g. DataClassSecond)

One of the options, add an additional data source and process them in your own stream.

But this requires an additional kinesis queue. and I'm not sure if this is a good approach Is there any approach to receive different data from kinesis and then, depending on the type, split the stream?

Upvotes: 0

Views: 224

Answers (1)

Dominik Wosiński
Dominik Wosiński

Reputation: 3864

You may try to filter the DataStream[String] based on fields, so that you will get two or more streams that only contain elements with proper JSON format.

So the simplest way to do it would be something like:

val streamDataClass = sourceStream.filter(_.contains("name"))
val streamDataClassSecond = sourceStream.filter(_.contains("surname"))

This will only work if the name and surname are unique to each DataClass. A little more efficient thing to do probably would be to first map the DataStream to some common format or use something like Either as deserialization result and then check if it was successful.

Upvotes: 1

Related Questions