Reputation: 141
I am new to Apache Flink and am trying to understand some best practices regarding scaling Flink streaming jobs along side with Kafka. Some questions I am not able to find suitable answers for include:
Thanks in advance for any support and apologize if these questions seems somewhat basic, but I'm trying to get a better handle on this technology. I've read through much of the documentation, but admittedly might not be putting some concepts together due to my lack of experience in this area. thanks for any help!
Upvotes: 1
Views: 844
Reputation: 623
There is no limitation on the number of streams, flink will scale depending on the memory / CPU of the Job Manager / Task Manager, the parallelization being used and the number of slots. I use YARN for managing the resources. If number of streams being connected is high, then we need to be little bit cautious that not all / bulk of the processing is taking place on some task managers as that will slow down the process. There can be lags in the kafka stream itself or internal lags due to some task managers being heavily loaded can definitely arise and preventive checks need to be put in place for that.
Continuous Queries support has been built as part of latest flink version, you can check the flink documentation for it.
If by reading one stream of data to another you mean connecting two streams in flink terminology then we can connect them on a common key and maintain an value state. Note that the value state is maintained in a task manager and is not shared across task managers. Else, if you imply union of two or more stream than we can build the flatmapfunctions in such a way that the data from such streams comes in a standard format.
Example of union:
val stream1: DataStream[UserBookingEvent] = BookingClosure.getSource(runmode).getSource(env)
.map(new ClosureMapFunction)
val stream2: DataStream[UserBookingEvent] = BookingCancel.getSource(runmode).getSource(env)
.map(new CancelMapFunction)
val unionStream: DataStream[UserBookingEvent] = stream1.union(stream2)
---
import org.apache.flink.api.common.functions.MapFunction
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _}
import org.json4s.native.JsonMethods._
import org.slf4j.{Logger, LoggerFactory}
class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] {
override def map(in: String): Option[UserBookingEvent] = {
val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction])
try {
implicit lazy val formats = org.json4s.DefaultFormats
val json = parse(in)
..............
} catch {
case e: Exception => {
LOG.error("Could not parse Cancel Event= " + in + e.getMessage)
None
}
}
}
}
Upvotes: 1