Reputation: 673
I am using following scala code to run my flink streaming job
val mystream = StreamExecutionEnvironment.getExecutionEnvironment
mystream.addSource(new mySource(params))
.map(new myMap(params))
.addSink(new mySink(params)).setParallelism(1)
mystream.setParallelism(1)
mystream.execute("My Streaming")
when I use flink run -p 1
, the parallelism is 1(do not know whether -p works or the code works). when I use pure java to run, (in IDEA I suppose it runs in pure java), the parallelism is usually 5, which shows my code does not work. How to control it?
as top answer suggested, following code also does not work, still has parellelism of 5.
val mystream = StreamExecutionEnvironment.getExecutionEnvironment
mystream.addSource(new mySource(params))
.map(new myMap(params))
.addSink(new mySink(params))
mystream.setParallelism(1)
mystream.execute("My Streaming")
Upvotes: 0
Views: 379
Reputation: 3634
You set the default parallelism on the environment.
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(...)
Using .addSink(new mySink(params)).setParallelism(1)
overrides that default parallelism for the specific operator.
Upvotes: 3