Gaurav Kumar
Gaurav Kumar

Reputation: 1091

Flink: Broadcast a map of values created using keyBy operator to another stream, cold start problem

I have a high frequency stream publishing events, with each event containing some info about Car. I need to process this stream of events but exclude events with a certain city and plate number combination. This information about these blacklisted city and plate number combination comes from a S3 file which is updated everyday.

Example: Car events look like follow:

[
    {
        "name": "Car1",
        "plate": "XYZ123",
        "city": "Berlin"
    },
    {
        "name": "Car2",
        "plate": "XYZ1234",
        "city": "Amsterdam"
    },
    {
        "name": "Car3",
        "plate": "ASD 123",
        "city": "Kuala Lumpur"
    },
    {
        "name": "Car1",
        "plate": "XYZ123",
        "city": "Moscow"
    },
    {
        "name": "Car1",
        "plate": "XYZ123",
        "city": "Barcelona"
    }
]

The S3 file is as followed: Eg. Lets say it is called excludedCars

[
    {
        "plate": "XYZ123",
        "city": "Berlin"
    },
    {
        "plate": "ABC1231",
        "city": "Berlin"
    },
    {
        "plate": "AWS121",
        "city": "Berlin"
    },
    {
        "plate": "XYZ1234",
        "city": "Amsterdam"
    },
    {
        "plate": "AMC3421",
        "city": "Amsterdam"
    },
    {
        "plate": "ASD 123",
        "city": "Kuala Lumpur"
    },
    {
        "plate": "XYZ123",
        "city": "Moscow"
    },
    {
        "plate": "XYZ123",
        "city": "Barcelona"
    }
]

Approach:

  1. Use S3 file excludedCars as a streaming source.
  2. Transform the events to produce the following structure:
{
    "Berlin": ["XYZ123", "ABC1231", "AWS121"],
    "Amsterdam": ["XYZ1234", "AMC3421"],
    "Kuala Lumpur":["ASD 123"],
    "Moscow":["XYZ123"],
    "Barcelona":["XYZ123"]
}
  1. Broadcast this stream to main stream(cars stream). And then use the info from #2 to do the processing.

Code:


object Cars {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val excludedCarStream: DataStream[Array[ExcludedCarDetail]] = getExcludedCarsStream(env)
    val excludedCarDetails = excludedCarStream.flatMap(item => item) // Array of Excluded Car objects
    excludedCarDetails.map(car => (car.cityId, car.plateNumber)).keyBy(0) // As per my understanding, this should result into a map of city to array of plate number maps 
    excludedCarDetails.print() // This just prints the simple tuples without any grouping by city
    env.execute("Scala SocketTextStreamWordCount Example")
  }

  private def getExcludedCarsStream(env: StreamExecutionEnvironment): DataStream[Array[ExcludedCarDetail]] = {
    val path: String = "file:///Users/name/flinkTest/excluded"
    val textInputFormat = new TextInputFormat(new Path(path))
    env
      .readFile(
        textInputFormat,
        path,
        FileProcessingMode.PROCESS_CONTINUOUSLY,
        1000
      )
      .map(jsonString => {
        val excludedCars: Array[ExcludedCarDetail] = (new Gson).fromJson(jsonString, classOf[Array[ExcludedCarDetail]])
        excludedCars
      })
  }
}

case class ExcludedCarDetail(
  @(SerializedName @scala.annotation.meta.field)("city") cityId: String,
  @(SerializedName @scala.annotation.meta.field)("plate") plateNumber: String
)

As per my understanding, excludedCarDetails.map(car => (car.cityId, car.plateNumber)).keyBy(0) should result into a map of city to array of plate numbers which I could broadcast to my main stream(cars). Instead it just simply prints the tuples of (city, plateNumber).

I am absolutely fresh to Flink, and trying to grasp and implement concepts. Please suggest what I am doing wrong and how could I achieve the required behaviour.

Constraint: The structure of broadcast map cannot be changed.

Broadcast State Solution:

object Cars {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val excludedCarsState: MapStateDescriptor[Int, List[String]] = new MapStateDescriptor("excludedCars", classOf[Int], classOf[List[String]])
    val excludedCarDetails: DataStream[ExcludedCarDetail] = getExcludedCarsStream(env)
    val excludedCarBroadcast: BroadcastStream[ExcludedCarDetail] = excludedCarDetails.broadcast(excludedCarsState)


    val carsStream: DataStream[CarDetail] = getMainCarsStream(env)

    val bs = carsStream
      .keyBy(_.cityId)
      .connect(excludedCarBroadcast)
      .process(new CarsStateLogic(excludedCarsState))

    bs.print()

    env.execute("Scala SocketTextStreamWordCount Example")
  }

  private def getExcludedCarsStream(env: StreamExecutionEnvironment): DataStream[ExcludedCarDetail] = {
    val cars: ListBuffer[ExcludedCarDetail] = ListBuffer()
    for(i <- 0 until 3) {
      val cityId = i+1
      val plateNumber = "Plate"+(i+1)
      cars += ExcludedCarDetail(cityId, plateNumber) // Basically exclude cars with plate1 in city1, plate2 in city2, plate3 in city3
    }
    env.fromCollection(cars.toList)
  }

  private def getMainCarsStream(env: StreamExecutionEnvironment): DataStream[CarDetail] = {
    val cars: ListBuffer[CarDetail] = ListBuffer()
    for(i <- 0 until 10) {
      val cityId = i+1
      val plateNumber = "Plate"+(i+1)
      val name = "Name"+(i+1)
      cars += CarDetail(cityId, plateNumber, name)
    }
    env.fromCollection(cars.toList)
  }
}

case class ExcludedCarDetail(cityId: Int, plateNumber: String)
case class CarDetail(cityId: Int, plateNumber: String, name: String)

class CarsStateLogic(excludedCarsState: MapStateDescriptor[Int, List[String]]) extends KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail] {
  override def processElement(car: CarDetail, ctx: KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail]#ReadOnlyContext, out: Collector[CarDetail]): Unit = {
    val state = ctx.getBroadcastState(excludedCarsState)

    if(state.contains(car.cityId)) {
      val cityState = state.get(car.cityId)
      if(cityState.indexOf(car.plateNumber) < 0) { // not excluded
        out.collect(car)
      }
    } else {
      out.collect(car)
    }
  }

  override def processBroadcastElement(value: ExcludedCarDetail, ctx: KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail]#Context, out: Collector[CarDetail]): Unit = {
    val state = ctx.getBroadcastState(excludedCarsState)
    val newStateForKey = if(state.contains(value.cityId)) {
      value.plateNumber :: state.get(value.cityId)
    } else {
      List(value.plateNumber)
    }
    ctx.getBroadcastState(excludedCarsState).put(value.cityId, newStateForKey)
    println("BroadCast element: CityId:"+ value.cityId+ ", State: "+state.get(value.cityId))
  }
}

But I hit the cold start problem now. What is the reliable way of making sure that the broadcasted state is available before the main data is processed.

Upvotes: 0

Views: 395

Answers (1)

kkrugler
kkrugler

Reputation: 9245

If the excluded cars data set is small, then you can just broadcast it as-is (no grouping by city). If it's big, then you'd key by city (same as the car stream), and connect those two streams so that each sub-task only gets a partitioned set of all of the excluded cars and regular car data.

Note that you have the cold-start problem, where you want to first process all of the current excluded car data, before processing any of the regular car data, so that you don't get false positives from car data being processed before the excluded car data has been received.

Upvotes: 2

Related Questions