在下面的代码中:
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
private String checkpointsDir = "file://checkpoints/"
private String rocksDBStateDir = "file://state/rocksdb/"
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.SetParallelism(3);
EnvironmentSettings tableSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment
.create(env, tableSettings);
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
env.enableCheckpointing(5000)
env.checkpointConfig.minPauseBetweenCheckpoints = 100
env.checkpointConfig.setCheckpointStorage(checkpointsDir)
stateBackend = EmbeddedRocksDBStateBackend()
stateBackend.setDbStoragePath(rocksDBStateDir)
env.stateBackend = stateBackend
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
以上作业的并行度设置为 3。
env.SetParallelism(3)
每个作业中的操作员是否访问同一个 RocksDB 表(
Orders
)?读取或写入记录...
或者
并行性(在作业级别)是否意味着每个作业都有自己的 RocksDB?因为RocksDB是每个任务管理器,并且不能保证每个作业是否在同一个任务管理器中运行
RocksDB 不是一个网络服务;它是一个嵌入式键/值存储。每个任务管理器中都有单独的 RocksDB 实例。