我知道 Kafka Streams 使用 Murmur3 来散列消息的值以解决竞争条件。但这也意味着如果我们更改消息的结构,连接就会失败,因为哈希值已更改。
当我们在 Java 中添加原始类型或删除/重命名字段时就是这种情况。对于 json,重新排序/排序属性也会导致不同的哈希值。
例如,假设我们有一个 Person 和一个 City 对象,我们想要加入:
人:
public class Person {
private String id;
private String name;
private String cityId; // foreign-key
}
城市:
public class City {
private String id;
private String name;
}
我们现在可以像下面这样连接两个表:
persons.join(cities, person -> person.getCityId(), (person, city) -> // join function...
接下来,将 id 为
p1
的消息发布到 person 主题,并将 id 为 c1
的消息发布到 cars 主题,加入就会起作用。现在,停止流应用程序并在 Person 类中添加一个新的原始属性:
public class Person {
private String id;
private String name;
private String cityId; // foreign-key
private boolean deleted; // new primitve attribute
}
再次运行streams应用程序并将id为
c1
的消息发布到城市主题,您将看到加入不起作用。但是,如果我们将 id 为 p1
的消息发布到 person 主题,则加入将会起作用。
有没有办法解决这个问题。我们使用 json serde 进行序列化/反序列化。
您所描述的听起来像是一个已知问题:https://issues.apache.org/jira/browse/KAFKA-15303
不幸的是,我不知道解决方法。