我是 KSQL 和 Apache Kafka 的新手。 我正在尝试解决我的表如何不断返回 null 的问题,因为表和流的数据类型相同。
名称:fleet_location_table
Field | Type
--------------------------------------------------------
VEHICLE_ID | INTEGER (primary key)
LOCATION | STRUCT<LATITUDE DOUBLE, LONGITUDE DOUBLE>
TS | BIGINT
--------------------------------------------------------
命令:
CREATE STREAM fleet_location (
vehicle_id INT key,
location STRUCT<latitude DOUBLE, longitude DOUBLE>,
ts BIGINT
) WITH (
KAFKA_TOPIC='fleet',
VALUE_FORMAT='AVRO' );
ksql 的输出:
|VEHICLE_ID |LOCATION |TS |
|2728 |null |null |
|6514 |null |null |
|3553 |null |null |
名称:FLEET_LOCATION STREAM
Field | Type
--------------------------------------------------------
VEHICLE_ID | INTEGER (key)
LOCATION | STRUCT<LATITUDE DOUBLE, LONGITUDE DOUBLE>
TS | BIGINT
--------------------------------------------------------
命令:
CREATE TABLE fleet_location_tablle (
vehicle_id INT PRIMARY KEY,
location STRUCT<latitude DOUBLE, longitude DOUBLE>,
ts BIGINT
) WITH (
KAFKA_TOPIC='fleet',
VALUE_FORMAT='AVRO' );
ksql 的输出:
VEHICLE_ID |LOCATION |TS |
+--------------------+--------------------+--------------------+
|5991 |{LATITUDE=0.28093476|1609459200000 |
| |293253206, LONGITUDE| |
| |=0.4380587866129694}| |
|6712 |{LATITUDE=0.39206537|1609459300000 |
| |957972853, LONGITUDE| |
| |=0.804702902405193} | |
|7821 |{LATITUDE=0.72617699|1609459400000 |
| |4302253, LONGITUDE=0| |
| |.5737167884213967} | |
|5624 |{LATITUDE=0.05420834|1609459500000 |
| |5198622276, LONGITUD| |
| |E=0.3050187952614862| |
| |3} |
将我的表结构更改为:
后显然已解决CREATE TABLE location_fleet AS
> SELECT vehicle_id,
> latest_by_offset(location) AS location,
> latest_by_offset(ts) AS ts
> FROM fleet_location
> GROUP BY vehicle_id
> EMIT CHANGES;