ksqlDB不能正确获取rowkey

问题描述 投票:0回答:1

我在主题SENSOR_STATUS_DETAILS中产生了以下数据。json:

1001
{
  "sensorid": 1001,
  "status": "CONNECTED",
  "lastconnectedtime": "2020-05-31 22:31:54"
}
1002
{
  "sensorid": 1002,
  "status": "CONNECTED",
  "lastconnectedtime": "2020-05-31 22:33:37"
}

我试图用它做一个表,作为。

CREATE TABLE STATUS_IB_TABLE (ROWKEY INT KEY,
  sensorid INTEGER,
  status VARCHAR,
  lastconnectedtime STRING)
WITH (TIMESTAMP='lastconnectedtime', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss', KAFKA_TOPIC='SENSOR_STATUS_DETAILS', VALUE_FORMAT='JSON', KEY='sensorid');

ksqlDB的行键如下。

enter image description here

我希望行键是 感应器...我不知道发生了什么事。

请帮我解决这个问题。

先谢谢了!!

PS:

Confluent平台版本。5.5

ksqldb confluent-platform
1个回答
1
投票

这里的问题是,正在生成的数据到了Kafka主题的 SENSOR_STATUS_DETAILS 拥有 STRING 钥匙 INT 键。

如果你把 STRING "1001" 它只是碰巧序列化到了与 INT. 如果你将这些相同的字节反序列化为一个 INT 你得到的数字 825241649.

你有两个选择。

  1. 改变数据生成方式,使你生成一个序列化的32位整数作为Kafka消息的密钥,并继续导入为 ROWKEY INT KEY
  2. 改变你的 CREATE TABLE 声明要有 STRING 键。

    CREATE TABLE STATUS_IB_TABLE ( ROWKEY STRING KEY, // <-- string key sensorid STRING, // <-- matching type here. status VARCHAR, lastconnectedtime STRING ) WITH (<as above>);

第一种方案可能会带来略微更好的性能。

© www.soinside.com 2019 - 2024. All rights reserved.