向 clickhouse kafka 表链添加列

问题描述 投票:0回答:1

我有标准机制将数据从kafka主题流式传输到clickhouse,我的意思是像这样的算法表(Kafka引擎)->物化视图TO->目标表(MergeTree)。它工作得很好,我需要向这个算法添加两列,当我删除这个表链并用新列重新创建它时,数据不会到达目标表,日志中没有特定的错误表明错误与流媒体相关。当我重新创建没有新列的表时,它又可以正常工作了。我发现新列仅存在于一条消息中,其他消息没有新列。我对 kafka 没有丰富的经验,但根据我的理解,问题可能与我没有在目标表中加载新数据有关,因为并非每个 kafka 消息都有新列。可以采取什么措施来解决这个问题?我有想法使用 JSONforEach 行格式将原始数据流式传输到一列中,并使用 JSON clickhouse 函数处理这些数据,但这也不起作用,它会向我发送无法解析输入数据的特定错误,是否可能应该进行一些设置卡夫卡这边改变了? (我再次没有从 kafka 看到它是什么样子,我正在使用 Offset Explorer 查看数据。)。

            / *--------------------------------------CREATE
            DESTINATION
            TABLE
            LOCAL - ------------------------------------- * /
            
            CREATE
            TABLE
            database_name.ga_events_local
            ON
            CLUSTER
            'cluste_name'
            (
                `event_type` String,
            `source` String,
            `timestamp` UInt32,
            `master_id` String,
            `event_params_app` Nullable(String),
            `items_item_list_name`
            Nullable(String),
            `event_params_parameter`
            Nullable(String),
            `items_affiliation`
            Nullable(String),
            `event_params_item_cat`
            Nullable(String),
            `event_params_item_id`
            Nullable(String),
            `event_params_step`
            Nullable(String),
            `event_params_item_name`
            Nullable(String),
            `event_params_dest`
            Nullable(String),
            `items_promotion_id`
            Nullable(String),
            `step_number`
            Nullable(String),
            `items_item_name`
            Nullable(String),
            `event_params_item_variant`
            Nullable(String),
            `event_params_origin`
            Nullable(String),
            `event_params_way`
            Nullable(String),
            `items_item_brand`
            Nullable(String),
            `items_promotion_name`
            Nullable(String),
            `event_params_item_cat2`
            Nullable(String),
            `event`
            Nullable(String),
            `event_params_ux_ui`
            Nullable(String),
            `items_item_cat2`
            Nullable(String),
            `items_item_cat3`
            Nullable(String),
            `event_params_type`
            Nullable(String),
            `items_item_cat4`
            Nullable(String),
            `event_params_flow`
            Nullable(String),
            `event_params_segment`
            Nullable(String),
            `items_item_id`
            Nullable(String),
            `items_item_cat5`
            Nullable(String),
            `itemslocation_id`
            Nullable(String),
            `event_params_error`
            Nullable(String),
            `event_params_result`
            Nullable(String),
            `items_item_cat`
            Nullable(String),
            `event_params_quantity`
            Nullable(String),
            `event_params_id`
            Nullable(String),
            `items_item_variant`
            Nullable(String),
            `event_name`
            Nullable(String),
            `items_item_list_index`
            Nullable(String),
            --`event_params_item_list_name`
            Nullable(String),
            --`event_params_promo_name`
            Nullable(String),
            
            `deleted`
            UInt8 - -MATERIALIZED
            CASE
            WHEN
            event_type = 'UPDATE'
            THEN
            0
            WHEN
            event_type = 'DELETE'
            THEN
            1
            ELSE - 1000
            END
            
            )
            ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{layer}-{shard}/ga_events', '{replica}', timestamp, `deleted`)
            ORDER
            BY
            `master_id`
            SETTINGS
            index_granularity = 8192;
            / *--------------------------------------CREATE
            DESTINATION
            TABLE
            DISTRIBUTED - ------------------------------------- * /
            
            CREATE
            TABLE
            database_name.ga_events
            ON
            CLUSTER
            'cluste_name'
            (
                `event_type` String,
            `source` String,
            `timestamp` UInt32,
            `master_id` String,
            `event_params_app` Nullable(String),
            `items_item_list_name`
            Nullable(String),
            `event_params_parameter`
            Nullable(String),
            `items_affiliation`
            Nullable(String),
            `event_params_item_cat`
            Nullable(String),
            `event_params_item_id`
            Nullable(String),
            `event_params_step`
            Nullable(String),
            `event_params_item_name`
            Nullable(String),
            `event_params_dest`
            Nullable(String),
            `items_promotion_id`
            Nullable(String),
            `step_number`
            Nullable(String),
            `items_item_name`
            Nullable(String),
            `event_params_item_variant`
            Nullable(String),
            `event_params_origin`
            Nullable(String),
            `event_params_way`
            Nullable(String),
            `items_item_brand`
            Nullable(String),
            `items_promotion_name`
            Nullable(String),
            `event_params_item_cat2`
            Nullable(String),
            `event`
            Nullable(String),
            `event_params_ux_ui`
            Nullable(String),
            `items_item_cat2`
            Nullable(String),
            `items_item_cat3`
            Nullable(String),
            `event_params_type`
            Nullable(String),
            `items_item_cat4`
            Nullable(String),
            `event_params_flow`
            Nullable(String),
            `event_params_segment`
            Nullable(String),
            `items_item_id`
            Nullable(String),
            `items_item_cat5`
            Nullable(String),
            `itemslocation_id`
            Nullable(String),
            `event_params_error`
            Nullable(String),
            `event_params_result`
            Nullable(String),
            `items_item_cat`
            Nullable(String),
            `event_params_quantity`
            Nullable(String),
            `event_params_id`
            Nullable(String),
            `items_item_variant`
            Nullable(String),
            `event_name`
            Nullable(String),
            `items_item_list_index`
            Nullable(String),
            --`event_params_item_list_name`
            Nullable(String),
            --`event_params_promo_name`
            Nullable(String),
            `deleted`
            UInt8 - -MATERIALIZED
            CASE
            WHEN
            event_type = 'UPDATE'
            THEN
            0
            WHEN
            event_type = 'DELETE'
            THEN
            1
            ELSE - 1000
            END
            
            )
            ENGINE = Distributed('cluste_name', 'database_name', 'ga_events_local', javaHash(`master_id`))
            / *--------------------------------------CREATE
            KAFKA
            TABLE - ------------------------------------- * /
            CREATE
            TABLE
            database_name.ga_events_kafka
            (
            
                `event_type` String,
            `source` String,
            `timestamp` UInt64,
            `data.master_id` Nullable(String),
            `data.event_params_app`
            Nullable(String),
            `data.items_item_list_name`
            Nullable(String),
            `data.event_params_parameter`
            Nullable(String),
            `data.items_affiliation`
            Nullable(String),
            `data.event_params_item_cat`
            Nullable(String),
            `data.event_params_item_id`
            Nullable(String),
            `data.event_params_step`
            Nullable(String),
            `data.event_params_item_name`
            Nullable(String),
            `data.event_params_dest`
            Nullable(String),
            `data.items_promotion_id`
            Nullable(String),
            `data.step_number`
            Nullable(String),
            `data.items_item_name`
            Nullable(String),
            `data.event_params_item_variant`
            Nullable(String),
            `data.event_params_origin`
            Nullable(String),
            `data.event_params_way`
            Nullable(String),
            `data.items_item_brand`
            Nullable(String),
            `data.items_promotion_name`
            Nullable(String),
            `data.event_params_item_cat2`
            Nullable(String),
            `data.event`
            Nullable(String),
            `data.event_params_ux_ui`
            Nullable(String),
            `data.items_item_cat2`
            Nullable(String),
            `data.items_item_cat3`
            Nullable(String),
            `data.event_params_type`
            Nullable(String),
            `data.items_item_cat4`
            Nullable(String),
            `data.event_params_flow`
            Nullable(String),
            `data.event_params_segment`
            Nullable(String),
            `data.items_item_id`
            Nullable(String),
            `data.items_item_cat5`
            Nullable(String),
            `data.itemslocation_id`
            Nullable(String),
            `data.event_params_error`
            Nullable(String),
            `data.event_params_result`
            Nullable(String),
            `data.items_item_cat`
            Nullable(String),
            `data.event_params_quantity`
            Nullable(String),
            `data.event_params_id`
            Nullable(String),
            `data.items_item_variant`
            Nullable(String),
            `data.event_name`
            Nullable(String),
            `data.items_item_list_index`
            Nullable(String) - -,
            --    `data.event_params_item_list_name`
            Nullable(String),
            --    `data.event_params_promo_name`
            Nullable(String)
            )
            ENGINE = Kafka
            SETTINGS
            kafka_broker_list = 'somebroker:9093,somebroker:9093,somebroker:9093',
            kafka_topic_list = 'SOMETOPIC',
            kafka_format = 'AvroConfluent',
            kafka_num_consumers = 1,
            kafka_group_name = 'somegroup( I change this every time when I recreate tables ',
            format_avro_schema_registry_url = 'registry_url',
            kafka_client_id = 'someclient( I change this every time when I recreate tables ';
            
            / *--------------------------------------CREATE
            MATERIALIZED
            VIEW - ----------------- * /
            CREATE
            MATERIALIZED
            VIEW
            database_name.ga_events_kafka_mv
            TO
            database_name.ga_events
            
            (
                `event_type` String,
            `source` String,
            `timestamp` DateTime64(3),
            `master_id`
            Nullable(String),
            `event_params_app`
            Nullable(String),
            `items_item_list_name`
            Nullable(String),
            `event_params_parameter`
            Nullable(String),
            `items_affiliation`
            Nullable(String),
            `event_params_item_cat`
            Nullable(String),
            `event_params_item_id`
            Nullable(String),
            `event_params_step`
            Nullable(String),
            `event_params_item_name`
            Nullable(String),
            `event_params_dest`
            Nullable(String),
            `items_promotion_id`
            Nullable(String),
            `step_number`
            Nullable(String),
            `items_item_name`
            Nullable(String),
            `event_params_item_variant`
            Nullable(String),
            `event_params_origin`
            Nullable(String),
            `event_params_way`
            Nullable(String),
            `items_item_brand`
            Nullable(String),
            `items_promotion_name`
            Nullable(String),
            `event_params_item_cat2`
            Nullable(String),
            `event`
            Nullable(String),
            `event_params_ux_ui`
            Nullable(String),
            `items_item_cat2`
            Nullable(String),
            `items_item_cat3`
            Nullable(String),
            `event_params_type`
            Nullable(String),
            `items_item_cat4`
            Nullable(String),
            `event_params_flow`
            Nullable(String),
            `event_params_segment`
            Nullable(String),
            `items_item_id`
            Nullable(String),
            `items_item_cat5`
            Nullable(String),
            `itemslocation_id`
            Nullable(String),
            `event_params_error`
            Nullable(String),
            `event_params_result`
            Nullable(String),
            `items_item_cat`
            Nullable(String),
            `event_params_quantity`
            Nullable(String),
            `event_params_id`
            Nullable(String),
            `items_item_variant`
            Nullable(String),
            `event_name`
            Nullable(String),
            `items_item_list_index`
            Nullable(String),
            --                        `event_params_item_list_name`
            Nullable(String),
            --                        `event_params_promo_name`
            Nullable(String),
            `deleted`
            UInt8
            
            )
            AS
            SELECT
            
            `event_type`,
            `source`,
            `timestamp`,
            `data.master_id` as master_id,
            `data.event_params_app` as event_params_app,
            `data.items_item_list_name` as items_item_list_name,
            `data.event_params_parameter` as event_params_parameter,
            `data.items_affiliation` as items_affiliation,
            `data.event_params_item_cat` as event_params_item_cat,
            `data.event_params_item_id` as event_params_item_id,
            `data.event_params_step` as event_params_step,
            `data.event_params_item_name` as event_params_item_name,
            `data.event_params_dest` as event_params_dest,
            `data.items_promotion_id` as items_promotion_id,
            `data.step_number` as step_number,
            `data.items_item_name` as items_item_name,
            `data.event_params_item_variant` as event_params_item_variant,
            `data.event_params_origin` as event_params_origin,
            `data.event_params_way` as event_params_way,
            `data.items_item_brand` as items_item_brand,
            `data.items_promotion_name` as items_promotion_name,
            `data.event_params_item_cat2` as event_params_item_cat2,
            `data.event` as event,
            `data.event_params_ux_ui` as event_params_ux_ui,
            `data.items_item_cat2` as items_item_cat2,
            `data.items_item_cat3` as items_item_cat3,
            `data.event_params_type` as event_params_type,
            `data.items_item_cat4` as items_item_cat4,
            `data.event_params_flow` as event_params_flow,
            `data.event_params_segment` as event_params_segment,
            `data.items_item_id` as items_item_id,
            `data.items_item_cat5` as items_item_cat5,
            `data.itemslocation_id` as itemslocation_id,
            `data.event_params_error` as event_params_error,
            `data.event_params_result` as event_params_result,
            `data.items_item_cat` as items_item_cat,
            `data.event_params_quantity` as event_params_quantity,
            `data.event_params_id` as event_params_id,
            `data.items_item_variant` as items_item_variant,
            `data.event_name` as event_name,
            `data.items_item_list_index` as items_item_list_index,
            --    `data.event_params_promo_name` as event_params_promo_name,
            --    `data.event_params_item_list_name` as event_params_item_list_name,
            
            CASE
            WHEN
            event_type = 'UPDATE'
            THEN
            0
            WHEN
            event_type = 'DELETE'
            THEN
            1
            ELSE - 1000
            END as deleted
            
            FROM
            database_name.ga_events_kafka
            where
            `data.master_id` is not null and `data.master_id` <> ''          

应添加的列:

--

data.event_params_item_list_name
可空(字符串), --
data.event_params_promo_name
可空(字符串)

sql apache-kafka integration loading clickhouse
1个回答
0
投票

input_format_avro_allow_missing_fields = 1 修复了该问题。

允许使用 Avro 或 AvroConfluence 格式架构中未指定的字段。当架构中找不到某个字段时,ClickHouse 使用默认值而不是抛出异常。

可能的值:

0 — 禁用。 1 — 启用。 默认值:0。

© www.soinside.com 2019 - 2024. All rights reserved.