Flink statebackend - 并行性如何与 RocksDB 配合使用?

问题描述 投票:0回答:1

在下面的代码中:

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob {

   private String checkpointsDir  = "file://checkpoints/"
    private String rocksDBStateDir = "file://state/rocksdb/"


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.SetParallelism(3);     
        EnvironmentSettings tableSettings = EnvironmentSettings
            .newInstance()
               .useBlinkPlanner()
               .build();

       StreamTableEnvironment tableEnv = StreamTableEnvironment
            .create(env, tableSettings);

      tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");

       env.enableCheckpointing(5000)
       env.checkpointConfig.minPauseBetweenCheckpoints = 100
       env.checkpointConfig.setCheckpointStorage(checkpointsDir)

       stateBackend = EmbeddedRocksDBStateBackend()
       stateBackend.setDbStoragePath(rocksDBStateDir)
       env.stateBackend = stateBackend


       DataStream<Transaction> transactions = env
            .addSource(new TransactionSource())
            .name("transactions");
        
        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getAccountId)
            .process(new FraudDetector())
            .name("fraud-detector");

        alerts
            .addSink(new AlertSink())
            .name("send-alerts");

        env.execute("Fraud Detection");
    }
}

以上作业的并行度设置为 3。

env.SetParallelism(3)

每个作业中的操作员是否访问同一个 RocksDB 表(

Orders
)?读取或写入记录...

或者

并行性(在作业级别)是否意味着每个作业都有自己的 RocksDB?因为RocksDB是每个任务管理器,并且不能保证每个作业是否在同一个任务管理器中运行

java apache-flink flink-streaming
1个回答
0
投票

RocksDB 不是一个网络服务;它是一个嵌入式键/值存储。每个任务管理器中都有单独的 RocksDB 实例。

© www.soinside.com 2019 - 2024. All rights reserved.