Reputation: 177
I use KeyedCoProcessFunction
function to enrich main datastream with data comes from another stream
Code:
class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {
case class AssetStateDoc(assetId: Option[String])
private var associatedDevices: ValueState[AssetStateDoc] = _
override def open(parameters: Configuration): Unit = {
val associatedDevicesDescriptor =
new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
}
override def processElement1(
packet: PacketData,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {
val tmpState = associatedDevices.value
val state = if (tmpState == null) AssetStateDoc(None) else tmpState
state.assetId match {
case Some(assetId) =>
logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
case _ => logger.debug("Smth went wrong")
}
}
override def processElement2(
value: AssetCommandState,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {
value.command match {
case CREATE =>
logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
logger.debug(s"current state is ${associatedDevices.value()}")
associatedDevices.update(AssetStateDoc(Some(value.assetId)))
logger.debug(s"new state is ${associatedDevices.value()}")
case _ =>
logger.error("Got unknown AssetCommandState command")
}
}
}
processElement2()
works good, it's accept data and update a state.
but in a processElement1()
I am always hitting case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
although I expect that there will be a value that was set in processElement2
function
as an example I used this guide - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/
Upvotes: 2
Views: 1681
Reputation: 43419
processElement1
and processElement2
do share state, but keep in mind that this is key-partitioned state. This means that a value set in processElement2
when processing a given value v2 will only be seen in processElement1
when it is called later with a value v1 having the same key as v2.
Also keep in mind that you have no control over the race condition between the two streams coming into processElement1
and processElement2
.
The RidesAndFares exercise from the official Apache Flink training is all about learning to work with this part of the API. https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/etl/ is the home for the corresponding tutorial.
Upvotes: 6