我有标准机制将数据从kafka主题流式传输到clickhouse,我的意思是像这样的算法表(Kafka引擎)->物化视图TO->目标表(MergeTree)。它工作得很好,我需要向这个算法添加两列,当我删除这个表链并用新列重新创建它时,数据不会到达目标表,日志中没有特定的错误表明错误与流媒体相关。当我重新创建没有新列的表时,它又可以正常工作了。我发现新列仅存在于一条消息中,其他消息没有新列。我对 kafka 没有丰富的经验,但根据我的理解,问题可能与我没有在目标表中加载新数据有关,因为并非每个 kafka 消息都有新列。可以采取什么措施来解决这个问题?我有想法使用 JSONforEach 行格式将原始数据流式传输到一列中,并使用 JSON clickhouse 函数处理这些数据,但这也不起作用,它会向我发送无法解析输入数据的特定错误,是否可能应该进行一些设置卡夫卡这边改变了? (我再次没有从 kafka 看到它是什么样子,我正在使用 Offset Explorer 查看数据。)。
/ *--------------------------------------CREATE
DESTINATION
TABLE
LOCAL - ------------------------------------- * /
CREATE
TABLE
database_name.ga_events_local
ON
CLUSTER
'cluste_name'
(
`event_type` String,
`source` String,
`timestamp` UInt32,
`master_id` String,
`event_params_app` Nullable(String),
`items_item_list_name`
Nullable(String),
`event_params_parameter`
Nullable(String),
`items_affiliation`
Nullable(String),
`event_params_item_cat`
Nullable(String),
`event_params_item_id`
Nullable(String),
`event_params_step`
Nullable(String),
`event_params_item_name`
Nullable(String),
`event_params_dest`
Nullable(String),
`items_promotion_id`
Nullable(String),
`step_number`
Nullable(String),
`items_item_name`
Nullable(String),
`event_params_item_variant`
Nullable(String),
`event_params_origin`
Nullable(String),
`event_params_way`
Nullable(String),
`items_item_brand`
Nullable(String),
`items_promotion_name`
Nullable(String),
`event_params_item_cat2`
Nullable(String),
`event`
Nullable(String),
`event_params_ux_ui`
Nullable(String),
`items_item_cat2`
Nullable(String),
`items_item_cat3`
Nullable(String),
`event_params_type`
Nullable(String),
`items_item_cat4`
Nullable(String),
`event_params_flow`
Nullable(String),
`event_params_segment`
Nullable(String),
`items_item_id`
Nullable(String),
`items_item_cat5`
Nullable(String),
`itemslocation_id`
Nullable(String),
`event_params_error`
Nullable(String),
`event_params_result`
Nullable(String),
`items_item_cat`
Nullable(String),
`event_params_quantity`
Nullable(String),
`event_params_id`
Nullable(String),
`items_item_variant`
Nullable(String),
`event_name`
Nullable(String),
`items_item_list_index`
Nullable(String),
--`event_params_item_list_name`
Nullable(String),
--`event_params_promo_name`
Nullable(String),
`deleted`
UInt8 - -MATERIALIZED
CASE
WHEN
event_type = 'UPDATE'
THEN
0
WHEN
event_type = 'DELETE'
THEN
1
ELSE - 1000
END
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{layer}-{shard}/ga_events', '{replica}', timestamp, `deleted`)
ORDER
BY
`master_id`
SETTINGS
index_granularity = 8192;
/ *--------------------------------------CREATE
DESTINATION
TABLE
DISTRIBUTED - ------------------------------------- * /
CREATE
TABLE
database_name.ga_events
ON
CLUSTER
'cluste_name'
(
`event_type` String,
`source` String,
`timestamp` UInt32,
`master_id` String,
`event_params_app` Nullable(String),
`items_item_list_name`
Nullable(String),
`event_params_parameter`
Nullable(String),
`items_affiliation`
Nullable(String),
`event_params_item_cat`
Nullable(String),
`event_params_item_id`
Nullable(String),
`event_params_step`
Nullable(String),
`event_params_item_name`
Nullable(String),
`event_params_dest`
Nullable(String),
`items_promotion_id`
Nullable(String),
`step_number`
Nullable(String),
`items_item_name`
Nullable(String),
`event_params_item_variant`
Nullable(String),
`event_params_origin`
Nullable(String),
`event_params_way`
Nullable(String),
`items_item_brand`
Nullable(String),
`items_promotion_name`
Nullable(String),
`event_params_item_cat2`
Nullable(String),
`event`
Nullable(String),
`event_params_ux_ui`
Nullable(String),
`items_item_cat2`
Nullable(String),
`items_item_cat3`
Nullable(String),
`event_params_type`
Nullable(String),
`items_item_cat4`
Nullable(String),
`event_params_flow`
Nullable(String),
`event_params_segment`
Nullable(String),
`items_item_id`
Nullable(String),
`items_item_cat5`
Nullable(String),
`itemslocation_id`
Nullable(String),
`event_params_error`
Nullable(String),
`event_params_result`
Nullable(String),
`items_item_cat`
Nullable(String),
`event_params_quantity`
Nullable(String),
`event_params_id`
Nullable(String),
`items_item_variant`
Nullable(String),
`event_name`
Nullable(String),
`items_item_list_index`
Nullable(String),
--`event_params_item_list_name`
Nullable(String),
--`event_params_promo_name`
Nullable(String),
`deleted`
UInt8 - -MATERIALIZED
CASE
WHEN
event_type = 'UPDATE'
THEN
0
WHEN
event_type = 'DELETE'
THEN
1
ELSE - 1000
END
)
ENGINE = Distributed('cluste_name', 'database_name', 'ga_events_local', javaHash(`master_id`))
/ *--------------------------------------CREATE
KAFKA
TABLE - ------------------------------------- * /
CREATE
TABLE
database_name.ga_events_kafka
(
`event_type` String,
`source` String,
`timestamp` UInt64,
`data.master_id` Nullable(String),
`data.event_params_app`
Nullable(String),
`data.items_item_list_name`
Nullable(String),
`data.event_params_parameter`
Nullable(String),
`data.items_affiliation`
Nullable(String),
`data.event_params_item_cat`
Nullable(String),
`data.event_params_item_id`
Nullable(String),
`data.event_params_step`
Nullable(String),
`data.event_params_item_name`
Nullable(String),
`data.event_params_dest`
Nullable(String),
`data.items_promotion_id`
Nullable(String),
`data.step_number`
Nullable(String),
`data.items_item_name`
Nullable(String),
`data.event_params_item_variant`
Nullable(String),
`data.event_params_origin`
Nullable(String),
`data.event_params_way`
Nullable(String),
`data.items_item_brand`
Nullable(String),
`data.items_promotion_name`
Nullable(String),
`data.event_params_item_cat2`
Nullable(String),
`data.event`
Nullable(String),
`data.event_params_ux_ui`
Nullable(String),
`data.items_item_cat2`
Nullable(String),
`data.items_item_cat3`
Nullable(String),
`data.event_params_type`
Nullable(String),
`data.items_item_cat4`
Nullable(String),
`data.event_params_flow`
Nullable(String),
`data.event_params_segment`
Nullable(String),
`data.items_item_id`
Nullable(String),
`data.items_item_cat5`
Nullable(String),
`data.itemslocation_id`
Nullable(String),
`data.event_params_error`
Nullable(String),
`data.event_params_result`
Nullable(String),
`data.items_item_cat`
Nullable(String),
`data.event_params_quantity`
Nullable(String),
`data.event_params_id`
Nullable(String),
`data.items_item_variant`
Nullable(String),
`data.event_name`
Nullable(String),
`data.items_item_list_index`
Nullable(String) - -,
-- `data.event_params_item_list_name`
Nullable(String),
-- `data.event_params_promo_name`
Nullable(String)
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'somebroker:9093,somebroker:9093,somebroker:9093',
kafka_topic_list = 'SOMETOPIC',
kafka_format = 'AvroConfluent',
kafka_num_consumers = 1,
kafka_group_name = 'somegroup( I change this every time when I recreate tables ',
format_avro_schema_registry_url = 'registry_url',
kafka_client_id = 'someclient( I change this every time when I recreate tables ';
/ *--------------------------------------CREATE
MATERIALIZED
VIEW - ----------------- * /
CREATE
MATERIALIZED
VIEW
database_name.ga_events_kafka_mv
TO
database_name.ga_events
(
`event_type` String,
`source` String,
`timestamp` DateTime64(3),
`master_id`
Nullable(String),
`event_params_app`
Nullable(String),
`items_item_list_name`
Nullable(String),
`event_params_parameter`
Nullable(String),
`items_affiliation`
Nullable(String),
`event_params_item_cat`
Nullable(String),
`event_params_item_id`
Nullable(String),
`event_params_step`
Nullable(String),
`event_params_item_name`
Nullable(String),
`event_params_dest`
Nullable(String),
`items_promotion_id`
Nullable(String),
`step_number`
Nullable(String),
`items_item_name`
Nullable(String),
`event_params_item_variant`
Nullable(String),
`event_params_origin`
Nullable(String),
`event_params_way`
Nullable(String),
`items_item_brand`
Nullable(String),
`items_promotion_name`
Nullable(String),
`event_params_item_cat2`
Nullable(String),
`event`
Nullable(String),
`event_params_ux_ui`
Nullable(String),
`items_item_cat2`
Nullable(String),
`items_item_cat3`
Nullable(String),
`event_params_type`
Nullable(String),
`items_item_cat4`
Nullable(String),
`event_params_flow`
Nullable(String),
`event_params_segment`
Nullable(String),
`items_item_id`
Nullable(String),
`items_item_cat5`
Nullable(String),
`itemslocation_id`
Nullable(String),
`event_params_error`
Nullable(String),
`event_params_result`
Nullable(String),
`items_item_cat`
Nullable(String),
`event_params_quantity`
Nullable(String),
`event_params_id`
Nullable(String),
`items_item_variant`
Nullable(String),
`event_name`
Nullable(String),
`items_item_list_index`
Nullable(String),
-- `event_params_item_list_name`
Nullable(String),
-- `event_params_promo_name`
Nullable(String),
`deleted`
UInt8
)
AS
SELECT
`event_type`,
`source`,
`timestamp`,
`data.master_id` as master_id,
`data.event_params_app` as event_params_app,
`data.items_item_list_name` as items_item_list_name,
`data.event_params_parameter` as event_params_parameter,
`data.items_affiliation` as items_affiliation,
`data.event_params_item_cat` as event_params_item_cat,
`data.event_params_item_id` as event_params_item_id,
`data.event_params_step` as event_params_step,
`data.event_params_item_name` as event_params_item_name,
`data.event_params_dest` as event_params_dest,
`data.items_promotion_id` as items_promotion_id,
`data.step_number` as step_number,
`data.items_item_name` as items_item_name,
`data.event_params_item_variant` as event_params_item_variant,
`data.event_params_origin` as event_params_origin,
`data.event_params_way` as event_params_way,
`data.items_item_brand` as items_item_brand,
`data.items_promotion_name` as items_promotion_name,
`data.event_params_item_cat2` as event_params_item_cat2,
`data.event` as event,
`data.event_params_ux_ui` as event_params_ux_ui,
`data.items_item_cat2` as items_item_cat2,
`data.items_item_cat3` as items_item_cat3,
`data.event_params_type` as event_params_type,
`data.items_item_cat4` as items_item_cat4,
`data.event_params_flow` as event_params_flow,
`data.event_params_segment` as event_params_segment,
`data.items_item_id` as items_item_id,
`data.items_item_cat5` as items_item_cat5,
`data.itemslocation_id` as itemslocation_id,
`data.event_params_error` as event_params_error,
`data.event_params_result` as event_params_result,
`data.items_item_cat` as items_item_cat,
`data.event_params_quantity` as event_params_quantity,
`data.event_params_id` as event_params_id,
`data.items_item_variant` as items_item_variant,
`data.event_name` as event_name,
`data.items_item_list_index` as items_item_list_index,
-- `data.event_params_promo_name` as event_params_promo_name,
-- `data.event_params_item_list_name` as event_params_item_list_name,
CASE
WHEN
event_type = 'UPDATE'
THEN
0
WHEN
event_type = 'DELETE'
THEN
1
ELSE - 1000
END as deleted
FROM
database_name.ga_events_kafka
where
`data.master_id` is not null and `data.master_id` <> ''
应添加的列:
--
data.event_params_item_list_name
可空(字符串),
-- data.event_params_promo_name
可空(字符串)
input_format_avro_allow_missing_fields = 1 修复了该问题。
允许使用 Avro 或 AvroConfluence 格式架构中未指定的字段。当架构中找不到某个字段时,ClickHouse 使用默认值而不是抛出异常。
可能的值:
0 — 禁用。 1 — 启用。 默认值:0。