Faizan Ahmed
Faizan Ahmed

Reputation: 11

How to explode a Datastream with a json array into DataStream of individual array elements

I have a Datastream[ObjectNode] which I read as deserialized json from a kafka topic. One of the element of this ObjectNode is actually an array of events. This array has varying length. The incoming json stream looks like this :

{
    "eventType": "Impression",
    "deviceId": "359849094258487",
    "payload": {
        "vertical_name": "",
        "promo_layout_type": "aa",
        "Customer_Id": "1011851",
        "ecommerce": {
            "promoView": {
                "promotions": [{
                    "name": "/-category_icons_all",
                    "id": "300275",
                    "position": "slot_5_1",
                    "creative": "Central/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
                }, {
                    "name": "/-category_icons_all",
                    "id": "300276",
                    "position": "slot_6_1",
                    "creative": "Lifestyle/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
                }, {
                    "name": "/-category_icons_all",
                    "id": "413002",
                    "position": "slot_7_1",
                    "creative": "Uber/Deals/00000001B890D1739913DDA956AB5C79775991EC"
                }]
            }
        }
    }
}

I want to be able to explode the promotions array so that each element inside becomes an individual message which can be written to a sink kafka topic. Does Flink provide the explode feature in DataStream and/or Table API?

I have tried to do a RichFlatMap on this stream to be able to collect individual rows but this also just returns me a DataStream[Seq[GenericRecord]] as below:

class PromoMapper(schema: Schema) extends RichFlatMapFunction[node.ObjectNode,Seq[GenericRecord]] {

  override def flatMap(value: ObjectNode, out: Collector[Seq[GenericRecord]]): Unit = {
    val promos = value.get("payload").get("ecommerce").get("promoView").get("promotions").asInstanceOf[Seq[node.ObjectNode]]

    val record = for{promo <- promos} yield {
      val processedRecord: GenericData.Record = new GenericData.Record(schema)
      promo.fieldNames().asScala.foreach(f => processedRecord.put(f,promo.get(f)))
      processedRecord
    }

    out.collect(record)
  }
}

Please help.

Upvotes: 1

Views: 1366

Answers (1)

David Anderson
David Anderson

Reputation: 43612

Using a flatmap is the right idea (not sure why you bothered with a RichFlatMap, but that's a detail).

Seems like you should be calling out.collect(processedRecord) for each element inside the for loop, rather than once on the Seq produced by that loop.

Upvotes: 1

Related Questions