Reputation: 112
I would like to have each message in the flink consumer stream produce multiple messages, each via a seperate thread, to some topic in kafka using flink kafka producer.Im writing the program in Scala but answers in Java will do
Something like this:
def thread(x:String): Thread =
{
val thread_ = new Thread {
override def run {
val str = some_processing(x)
flink_producer(str)
}
}
return thread_
}
val stream = flink_consumer()
stream.map(x =>{
var i = 0
while(i < 10){
val th = thread(x)
th.start()
i = i+1
}
})
So for each input in flink consumer I would like to produce 10 messages to some other queue using multi-threading.
Upvotes: 0
Views: 1026
Reputation: 461
Most of Flink operators are parallel operators, so there's no reason for you to create any kind of thread in your data pipeline, Flink should be the one that manages how many parallel instances could exist of an operator and if you want to set that value, you should use the following API method.
.setParallelism(N) //N is 10 for you,
You can get more info in Fink documentation
You sould do something like this:
Your code should look like this:
val stream = flink_consumer()
stream.flatMap((x, out) =>{
var i = 0
while(i < 10){
val valueToCollect = process(x,i)
out.collect(valueToCollect)
}
}).setParallelism(10)
.map(doSomethingWithGeneratedValues)
.addSink(sinkThatSendsDataToYourDesiredSystem)
Another aproach if you know how many parallel task you want to have
val stream = flink_consumer()
val resultStream = stream.map(process)
val sinkStream = resultStream.union(resultStream,resultStream,resultStream,...) // joins resultStream N times
sinkStream.addSink(sinkThatSendsDataToYourDesiredSystem)
Finally, you can also have multiple sinks for a DataStream
val stream = flink_consumer()
val resultStream = stream.map(process)
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
...
N
...
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
If you want to do parallel writes to your data sink, you must ensure that the sink you use has support to that kind of write operations.
Upvotes: 1