我将json(20键值对)推入kafka并且能够使用它 - 测试它以验证数据是否成功被推送到kafka。
以下脚本正在创建管道,但它没有将数据加载到memsql表。我是否需要修改JSON数据类型的创建管道脚本。
CREATE OR REPLACE PIPELINE omnitracs_gps_evt_pipeline
AS LOAD DATA KAFKA '192.168.188.110:9092/ib_Omnitracs'
INTO procedure INGEST_OMNITRACS_EVT_PROC;
DELIMITER //
CREATE OR REPLACE PROCEDURE INGEST_OMNITRACS_EVT_PROC(batch query(evt_json json))
AS
BEGIN
INSERT INTO TEST(id, name)
SELECT evt_json::ignition,evt_json::positiontype
FROM batch;
ECHO SELECT 'HELLO';
END
//
DELIMITER ;
TEST PIPELINE omnitracs_gps_evt_pipeline LIMIT 5;
START PIPELINE omnitracs_gps_evt_pipeline FOREGROUND LIMIT 5 BATCHES;
任何人都可以帮助它应该是什么。
您可能应修改CREATE PIPELINE的AS LOAD DATA子句以执行本机JSON加载,如下所述:https://docs.memsql.com/sql-reference/v6.7/load-data/#json-load-data。
有两个原因:
我建议如下:
CREATE OR REPLACE PIPELINE omnitracs_gps_evt_pipeline
AS LOAD DATA KAFKA '192.168.188.110:9092/ib_Omnitracs'
INTO TABLE TEST
FORMAT JSON
(
id <- ignition_event,
name <- position_type
);
管道的存储过程中不允许使用ECHO SELECT。如果运行START PIPELINE ... FOREGROUND,或者在CREATE PIPELINE时间(如果已定义过程),则应该出现错误。
从kafka中的生产者中删除ProducerConfig.TRANSACTIONAL_ID_CONFIG配置后,管道现在正在工作。
CREATE PIPELINE FEB13_PIPELINE_2 AS LOAD DATA KAFKA '192.168.188.110:9092/FEB13_PROC' INTO procedure INGEST_EVT_PROC; DELIMITER // CREATE OR REPLACE PROCEDURE INGEST_EVT_PROC(batch query(evt_json json)) AS BEGIN INSERT INTO TEST_FEB13(ID, NAME) SELECT evt_json::ID,evt_json::NAME FROM batch; END // DELIMITER ;
只是一个小小的疑问现在甚至双引号也被添加到表格列中。如何逃避它。 JSON发送给kafka:“{'ID':1,'NAME':\'a \'}”