Reputation: 177
Some code:
implicit val formats = Serialization.formats(NoTypeHints)
case class DataClass(id: String, name: String)
val dataSource = env
.addSource(new FlinkKinesisConsumer[String](s"data-stream-$stage", new SimpleStringSchema, consumerConfig))
.uid(s"data-stream-$stage-source-id").name("dataSource")
.map(json => read[DataClass](json))
Here I am taking data from kinesis stream and do serialization into my data class.
Everything works fine, but now there is a need to add the ability to receive data in one more additional format (e.g. DataClassSecond
)
One of the options, add an additional data source and process them in your own stream.
But this requires an additional kinesis queue. and I'm not sure if this is a good approach Is there any approach to receive different data from kinesis and then, depending on the type, split the stream?
Upvotes: 0
Views: 224
Reputation: 3864
You may try to filter
the DataStream[String]
based on fields, so that you will get two or more streams that only contain elements with proper JSON format.
So the simplest way to do it would be something like:
val streamDataClass = sourceStream.filter(_.contains("name"))
val streamDataClassSecond = sourceStream.filter(_.contains("surname"))
This will only work if the name
and surname
are unique to each DataClass
. A little more efficient thing to do probably would be to first map
the DataStream
to some common format or use something like Either
as deserialization result and then check if it was successful.
Upvotes: 1