如何在纯java中设置flink的并行性(IDEA)。

问题描述 投票:0回答:1

我使用下面的scala代码来运行我的flink流式作业。

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
    mystream.addSource(new mySource(params))
      .map(new myMap(params))
      .addSink(new mySink(params)).setParallelism(1)
    mystream.setParallelism(1)
    mystream.execute("My Streaming")

当我使用 flink run -p 1当我用纯java运行的时候,(在IDEA中应该是用纯java运行的),并行度通常是5,这说明我的代码不工作。如何控制它?


正如上面的回答所建议的那样,下面的代码也不工作,仍然有5的并行度。

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
    mystream.addSource(new mySource(params))
      .map(new myMap(params))
      .addSink(new mySink(params))
    mystream.setParallelism(1)
    mystream.execute("My Streaming")
intellij-idea apache-flink flink-streaming
1个回答
2
投票

你可以在环境上设置默认的并行性。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(...)

使用 .addSink(new mySink(params)).setParallelism(1) 覆盖特定运算符的默认并行性。

© www.soinside.com 2019 - 2024. All rights reserved.