使用 DataStream API 外键加入 Apache Flink

问题描述 投票:0回答:1

免责声明:我正在为我的组织开发具有各种场景的 Apache Flink POC。我正处于学习阶段。

目前,我们正在使用 Kafka Streams(以及 KTable)来连接多个流。然而,我们更关心 Kafka Streams 的延迟问题,这就是我们正在探索使用 Apache Flink 的选项的地方。

其中一个场景涉及 FK join。为简单起见,我以 Employee 和 Department 为例,其中employee.deptId = Department.depId 并且 Department 可以由多个员工组成(一对多关系)。我使用有状态流来实现这一点,如下所示:

        //DataStreamSource<Long> streamSource = env.fromSequence(1, 10000000);
        SingleOutputStreamOperator<Employee> employeeSourceStreamOperator = env
                .fromSource(employeeSource, WatermarkStrategy.noWatermarks(), "Employee")
                .map(value -> objectMapper.readValue(value, Employee.class));
        SingleOutputStreamOperator<Department> departmentSingleOutputStreamOperator = env
                .fromSource(departmentSource, WatermarkStrategy.noWatermarks(), "Department")
                .map(value -> objectMapper.readValue(value, Department.class));
        employeeSourceStreamOperator
                .connect(departmentSingleOutputStreamOperator)
                .keyBy(Employee::getDeptId, Department::getDeptId)
                .map(new RichCoMapFunction<Employee, Department, Object>() {
                    // ListState to store multiple Employee instances for each department
                    private ListState<Employee> employeeListState;
                    // ValueState to store Department information
                    private ValueState<Department> departmentState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // Initialize ListState for Employee
                        ListStateDescriptor<Employee> employeeListStateDescriptor =
                                new ListStateDescriptor<>("employeeListState", Employee.class);
                        employeeListState = getRuntimeContext().getListState(employeeListStateDescriptor);
                        // Initialize ValueState for Department
                        ValueStateDescriptor<Department> departmentStateDescriptor =
                                new ValueStateDescriptor<>("departmentState", Department.class);
                        departmentState = getRuntimeContext().getState(departmentStateDescriptor);
                    }

                    @Override
                    public Object map1(Employee employee) throws Exception {
                        // Process Employee stream
                        // Store Employee information in MapState based on departmentId
                        employeeListState.add(employee);
                        // Try to join with Department information
                        Department department = departmentState.value();
                        if (department != null) {
                            // Join Employee and Department information
                            return "Employee join:" + employee.getName() + " works in " + department.getDeptName();
                        }
                        return ""; // No immediate result, need to wait for Department information
                    }

                    @Override
                    public Object map2(Department department) throws Exception {
                        // Process Department stream
                        // Store Department information in ValueState
                        departmentState.update(department);
                        // Try to join with Employee information
                        Iterable<Employee> employees = employeeListState.get();
                        if (employees != null) {
                            // Join Employee and Department information for each employee in the list
                            StringBuilder result = new StringBuilder();
                            for (Employee employee : employees) {
                                result.append("Department join:" + generateOutput(employee, department)).append("\n");
                            }
                            return result.toString();
                        }
                        return ""; // No immediate result, need to wait for Employee information
                    }

                    private String generateOutput(Employee employee, Department department) {
                        return employee.getName() + " works in " + department.getDeptName();
                    }
                })
                .sinkTo(new PrintSink<>());

这正在按预期工作。但是,我无法弄清楚下面的场景。 员工可以更换部门。在这种情况下,员工与部门的关系可以轻松更改,但从旧部门中删除关系却具有挑战性。有人可以帮助我使用 Flink 功能来帮助我获取旧的部门状态并删除员工吗?

如果您需要上面缺少的更多信息,请告诉我?

apache-flink streaming
1个回答
0
投票

如果使用 Flink 的 Table API(或 Flink SQL),这种应用程序的实现会更加简单。表/SQL 连接将自动处理您关心的更新。

© www.soinside.com 2019 - 2024. All rights reserved.