Reputation: 11
I have a Datastream[ObjectNode] which I read as deserialized json from a kafka topic. One of the element of this ObjectNode is actually an array of events. This array has varying length. The incoming json stream looks like this :
{
"eventType": "Impression",
"deviceId": "359849094258487",
"payload": {
"vertical_name": "",
"promo_layout_type": "aa",
"Customer_Id": "1011851",
"ecommerce": {
"promoView": {
"promotions": [{
"name": "/-category_icons_all",
"id": "300275",
"position": "slot_5_1",
"creative": "Central/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
}, {
"name": "/-category_icons_all",
"id": "300276",
"position": "slot_6_1",
"creative": "Lifestyle/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
}, {
"name": "/-category_icons_all",
"id": "413002",
"position": "slot_7_1",
"creative": "Uber/Deals/00000001B890D1739913DDA956AB5C79775991EC"
}]
}
}
}
}
I want to be able to explode the promotions
array so that each element inside becomes an individual message which can be written to a sink kafka topic. Does Flink provide the explode feature in DataStream and/or Table API?
I have tried to do a RichFlatMap on this stream to be able to collect individual rows but this also just returns me a DataStream[Seq[GenericRecord]] as below:
class PromoMapper(schema: Schema) extends RichFlatMapFunction[node.ObjectNode,Seq[GenericRecord]] {
override def flatMap(value: ObjectNode, out: Collector[Seq[GenericRecord]]): Unit = {
val promos = value.get("payload").get("ecommerce").get("promoView").get("promotions").asInstanceOf[Seq[node.ObjectNode]]
val record = for{promo <- promos} yield {
val processedRecord: GenericData.Record = new GenericData.Record(schema)
promo.fieldNames().asScala.foreach(f => processedRecord.put(f,promo.get(f)))
processedRecord
}
out.collect(record)
}
}
Please help.
Upvotes: 1
Views: 1366
Reputation: 43612
Using a flatmap is the right idea (not sure why you bothered with a RichFlatMap, but that's a detail).
Seems like you should be calling out.collect(processedRecord)
for each element inside the for loop, rather than once on the Seq produced by that loop.
Upvotes: 1