KSQL表数据返回null

问题描述 投票:0回答:1

我是 KSQL 和 Apache Kafka 的新手。 我正在尝试解决我的表如何不断返回 null 的问题,因为表和流的数据类型相同。

avro 链接: https://github.com/confluenceinc/kafka-connect-datagen/blob/master/src/main/resources/fleet_mgmt_location.avro

名称:fleet_location_table

 Field      | Type                                      
--------------------------------------------------------
 VEHICLE_ID | INTEGER          (primary key)            
 LOCATION   | STRUCT<LATITUDE DOUBLE, LONGITUDE DOUBLE> 
 TS         | BIGINT                                    
--------------------------------------------------------

命令:

 CREATE STREAM fleet_location (   
       vehicle_id INT key,   
       location  STRUCT<latitude DOUBLE, longitude DOUBLE>,   
       ts BIGINT 
       ) WITH (  
      KAFKA_TOPIC='fleet',   
      VALUE_FORMAT='AVRO' );

ksql 的输出:

|VEHICLE_ID          |LOCATION            |TS                  |
  |2728                |null                |null                | 
  |6514                |null                |null                | 
  |3553                |null                |null                | 

名称:FLEET_LOCATION STREAM

 Field      | Type                                      
--------------------------------------------------------
 VEHICLE_ID | INTEGER          (key)                    
 LOCATION   | STRUCT<LATITUDE DOUBLE, LONGITUDE DOUBLE> 
 TS         | BIGINT                                    
--------------------------------------------------------

    

命令:

CREATE TABLE fleet_location_tablle (
    vehicle_id INT PRIMARY KEY,
    location STRUCT<latitude DOUBLE, longitude DOUBLE>,
    ts BIGINT
  ) WITH (
    KAFKA_TOPIC='fleet',
    VALUE_FORMAT='AVRO' );

ksql 的输出:

VEHICLE_ID          |LOCATION            |TS                  |
+--------------------+--------------------+--------------------+
|5991                |{LATITUDE=0.28093476|1609459200000       |
|                    |293253206, LONGITUDE|                    |
|                    |=0.4380587866129694}|                    |
|6712                |{LATITUDE=0.39206537|1609459300000       |
|                    |957972853, LONGITUDE|                    |
|                    |=0.804702902405193} |                    |
|7821                |{LATITUDE=0.72617699|1609459400000       |
|                    |4302253, LONGITUDE=0|                    |
|                    |.5737167884213967}  |                    |
|5624                |{LATITUDE=0.05420834|1609459500000       |
|                    |5198622276, LONGITUD|                    |
|                    |E=0.3050187952614862|                    |
|                    |3}                  |               
apache-kafka apache-kafka-streams confluent-platform ksqldb data-generation
1个回答
0
投票

将我的表结构更改为:

后显然已解决
CREATE TABLE location_fleet AS
>    SELECT vehicle_id,
>           latest_by_offset(location) AS location,
>           latest_by_offset(ts) AS ts
>    FROM fleet_location
>    GROUP BY vehicle_id
>    EMIT CHANGES;
© www.soinside.com 2019 - 2024. All rights reserved.