Pipeline没有使用过程将数据摄取到memsql表中

问题描述 投票:0回答:3

我将json(20键值对)推入kafka并且能够使用它 - 测试它以验证数据是否成功被推送到kafka。

以下脚本正在创建管道,但它没有将数据加载到memsql表。我是否需要修改JSON数据类型的创建管道脚本。

CREATE OR REPLACE PIPELINE omnitracs_gps_evt_pipeline
AS LOAD DATA KAFKA '192.168.188.110:9092/ib_Omnitracs' 
INTO procedure INGEST_OMNITRACS_EVT_PROC;

DELIMITER //
CREATE OR REPLACE PROCEDURE INGEST_OMNITRACS_EVT_PROC(batch query(evt_json json))
AS
BEGIN
    INSERT INTO TEST(id, name) 
      SELECT evt_json::ignition,evt_json::positiontype
      FROM batch;
      ECHO SELECT 'HELLO';
END
//
DELIMITER ; 

TEST PIPELINE omnitracs_gps_evt_pipeline LIMIT 5;
START PIPELINE omnitracs_gps_evt_pipeline FOREGROUND LIMIT 5 BATCHES;

任何人都可以帮助它应该是什么。

apache-kafka memsql
3个回答
1
投票

您可能应修改CREATE PIPELINE的AS LOAD DATA子句以执行本机JSON加载,如下所述:https://docs.memsql.com/sql-reference/v6.7/load-data/#json-load-data

有两个原因:

  • 写入的管道将期望来自kafka的输入为具有1个字段的TSV格式。 TSV是默认格式,它推断从参数到目标存储过程的预期字段数。实际上输入JSON记录很可能会成功解析,但我不会依赖于此。
  • 使用本机JSON管道的subvalue_mapping子句来提取和插入:: ignition和:: positiontype,完全跳过存储过程的开销会更高效。此外,写入的管道将实例化临时内存中的JSON数据结构,这相对昂贵。

我建议如下:

CREATE OR REPLACE PIPELINE omnitracs_gps_evt_pipeline
AS LOAD DATA KAFKA '192.168.188.110:9092/ib_Omnitracs' 
INTO TABLE TEST
FORMAT JSON
( 
  id <- ignition_event,
  name <- position_type
);

0
投票

管道的存储过程中不允许使用ECHO SELECT。如果运行START PIPELINE ... FOREGROUND,或者在CREATE PIPELINE时间(如果已定义过程),则应该出现错误。


0
投票

从kafka中的生产者中删除ProducerConfig.TRANSACTIONAL_ID_CONFIG配置后,管道现在正在工作。

CREATE PIPELINE FEB13_PIPELINE_2
AS LOAD DATA KAFKA '192.168.188.110:9092/FEB13_PROC' 
INTO procedure INGEST_EVT_PROC;

DELIMITER //
CREATE OR REPLACE PROCEDURE INGEST_EVT_PROC(batch query(evt_json json))
AS
BEGIN
    INSERT INTO TEST_FEB13(ID, NAME) 
      SELECT evt_json::ID,evt_json::NAME
      FROM batch;
END
//
DELIMITER ;

只是一个小小的疑问现在甚至双引号也被添加到表格列中。如何逃避它。 JSON发送给kafka:“{'ID':1,'NAME':\'a \'}”

© www.soinside.com 2019 - 2024. All rights reserved.