Reputation: 1091
I have a high frequency stream publishing events, with each event containing some info about Car. I need to process this stream of events but exclude events with a certain city and plate number combination. This information about these blacklisted city and plate number combination comes from a S3 file which is updated everyday.
Example: Car events look like follow:
[
{
"name": "Car1",
"plate": "XYZ123",
"city": "Berlin"
},
{
"name": "Car2",
"plate": "XYZ1234",
"city": "Amsterdam"
},
{
"name": "Car3",
"plate": "ASD 123",
"city": "Kuala Lumpur"
},
{
"name": "Car1",
"plate": "XYZ123",
"city": "Moscow"
},
{
"name": "Car1",
"plate": "XYZ123",
"city": "Barcelona"
}
]
The S3 file is as followed: Eg. Lets say it is called excludedCars
[
{
"plate": "XYZ123",
"city": "Berlin"
},
{
"plate": "ABC1231",
"city": "Berlin"
},
{
"plate": "AWS121",
"city": "Berlin"
},
{
"plate": "XYZ1234",
"city": "Amsterdam"
},
{
"plate": "AMC3421",
"city": "Amsterdam"
},
{
"plate": "ASD 123",
"city": "Kuala Lumpur"
},
{
"plate": "XYZ123",
"city": "Moscow"
},
{
"plate": "XYZ123",
"city": "Barcelona"
}
]
Approach:
excludedCars
as a streaming source.{
"Berlin": ["XYZ123", "ABC1231", "AWS121"],
"Amsterdam": ["XYZ1234", "AMC3421"],
"Kuala Lumpur":["ASD 123"],
"Moscow":["XYZ123"],
"Barcelona":["XYZ123"]
}
Code:
object Cars {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val excludedCarStream: DataStream[Array[ExcludedCarDetail]] = getExcludedCarsStream(env)
val excludedCarDetails = excludedCarStream.flatMap(item => item) // Array of Excluded Car objects
excludedCarDetails.map(car => (car.cityId, car.plateNumber)).keyBy(0) // As per my understanding, this should result into a map of city to array of plate number maps
excludedCarDetails.print() // This just prints the simple tuples without any grouping by city
env.execute("Scala SocketTextStreamWordCount Example")
}
private def getExcludedCarsStream(env: StreamExecutionEnvironment): DataStream[Array[ExcludedCarDetail]] = {
val path: String = "file:///Users/name/flinkTest/excluded"
val textInputFormat = new TextInputFormat(new Path(path))
env
.readFile(
textInputFormat,
path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000
)
.map(jsonString => {
val excludedCars: Array[ExcludedCarDetail] = (new Gson).fromJson(jsonString, classOf[Array[ExcludedCarDetail]])
excludedCars
})
}
}
case class ExcludedCarDetail(
@(SerializedName @scala.annotation.meta.field)("city") cityId: String,
@(SerializedName @scala.annotation.meta.field)("plate") plateNumber: String
)
As per my understanding, excludedCarDetails.map(car => (car.cityId, car.plateNumber)).keyBy(0)
should result into a map of city to array of plate numbers
which I could broadcast to my main stream(cars). Instead it just simply prints the tuples of (city, plateNumber)
.
I am absolutely fresh to Flink, and trying to grasp and implement concepts. Please suggest what I am doing wrong and how could I achieve the required behaviour.
Constraint: The structure of broadcast map cannot be changed.
Broadcast State Solution:
object Cars {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val excludedCarsState: MapStateDescriptor[Int, List[String]] = new MapStateDescriptor("excludedCars", classOf[Int], classOf[List[String]])
val excludedCarDetails: DataStream[ExcludedCarDetail] = getExcludedCarsStream(env)
val excludedCarBroadcast: BroadcastStream[ExcludedCarDetail] = excludedCarDetails.broadcast(excludedCarsState)
val carsStream: DataStream[CarDetail] = getMainCarsStream(env)
val bs = carsStream
.keyBy(_.cityId)
.connect(excludedCarBroadcast)
.process(new CarsStateLogic(excludedCarsState))
bs.print()
env.execute("Scala SocketTextStreamWordCount Example")
}
private def getExcludedCarsStream(env: StreamExecutionEnvironment): DataStream[ExcludedCarDetail] = {
val cars: ListBuffer[ExcludedCarDetail] = ListBuffer()
for(i <- 0 until 3) {
val cityId = i+1
val plateNumber = "Plate"+(i+1)
cars += ExcludedCarDetail(cityId, plateNumber) // Basically exclude cars with plate1 in city1, plate2 in city2, plate3 in city3
}
env.fromCollection(cars.toList)
}
private def getMainCarsStream(env: StreamExecutionEnvironment): DataStream[CarDetail] = {
val cars: ListBuffer[CarDetail] = ListBuffer()
for(i <- 0 until 10) {
val cityId = i+1
val plateNumber = "Plate"+(i+1)
val name = "Name"+(i+1)
cars += CarDetail(cityId, plateNumber, name)
}
env.fromCollection(cars.toList)
}
}
case class ExcludedCarDetail(cityId: Int, plateNumber: String)
case class CarDetail(cityId: Int, plateNumber: String, name: String)
class CarsStateLogic(excludedCarsState: MapStateDescriptor[Int, List[String]]) extends KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail] {
override def processElement(car: CarDetail, ctx: KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail]#ReadOnlyContext, out: Collector[CarDetail]): Unit = {
val state = ctx.getBroadcastState(excludedCarsState)
if(state.contains(car.cityId)) {
val cityState = state.get(car.cityId)
if(cityState.indexOf(car.plateNumber) < 0) { // not excluded
out.collect(car)
}
} else {
out.collect(car)
}
}
override def processBroadcastElement(value: ExcludedCarDetail, ctx: KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail]#Context, out: Collector[CarDetail]): Unit = {
val state = ctx.getBroadcastState(excludedCarsState)
val newStateForKey = if(state.contains(value.cityId)) {
value.plateNumber :: state.get(value.cityId)
} else {
List(value.plateNumber)
}
ctx.getBroadcastState(excludedCarsState).put(value.cityId, newStateForKey)
println("BroadCast element: CityId:"+ value.cityId+ ", State: "+state.get(value.cityId))
}
}
But I hit the cold start problem now. What is the reliable way of making sure that the broadcasted state is available before the main data is processed.
Upvotes: 0
Views: 395
Reputation: 9245
If the excluded cars data set is small, then you can just broadcast it as-is (no grouping by city). If it's big, then you'd key by city (same as the car stream), and connect those two streams so that each sub-task only gets a partitioned set of all of the excluded cars and regular car data.
Note that you have the cold-start problem, where you want to first process all of the current excluded car data, before processing any of the regular car data, so that you don't get false positives from car data being processed before the excluded car data has been received.
Upvotes: 2