Litchy
Litchy

Reputation: 673

How to set flink parallelism in pure java (IDEA)

I am using following scala code to run my flink streaming job

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
    mystream.addSource(new mySource(params))
      .map(new myMap(params))
      .addSink(new mySink(params)).setParallelism(1)
    mystream.setParallelism(1)
    mystream.execute("My Streaming")

when I use flink run -p 1, the parallelism is 1(do not know whether -p works or the code works). when I use pure java to run, (in IDEA I suppose it runs in pure java), the parallelism is usually 5, which shows my code does not work. How to control it?


as top answer suggested, following code also does not work, still has parellelism of 5.

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
    mystream.addSource(new mySource(params))
      .map(new myMap(params))
      .addSink(new mySink(params))
    mystream.setParallelism(1)
    mystream.execute("My Streaming")

Upvotes: 0

Views: 379

Answers (1)

Arvid Heise
Arvid Heise

Reputation: 3634

You set the default parallelism on the environment.

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(...)

Using .addSink(new mySink(params)).setParallelism(1) overrides that default parallelism for the specific operator.

Upvotes: 3

Related Questions