Kafka制作人使用HiveStorageHandler

问题描述 投票:0回答:2

我对hive / hadoop比较新

我正在阅读这个Hive Storage Handlers

现在我正在尝试编写HiveStorageHandler的自定义实现,以便使用Hive Table查询和推送消息给Kafka。

我看到HiveStorageHandler还有其他实现,它允许我们使用hive表查询和编写NoSQL数据库。

我想为卡夫卡复制一下。我发现了一个项目

HiveKa - query Kafka using Hive

他们在这里尝试使用hive表上的查询从Kafka读取数据。我希望使用表格中的insert来写kafka主题。

有人可以指导我吗?

hadoop hive apache-kafka kafka-producer-api
2个回答
1
投票

我希望使用表格中的insert来写kafka主题。

使用Kafka HiveStorageHandler可以实现这一点。以下是此功能可能的一般用例

  1. 查询Kafka主题
  2. 从Kafka主题查询数据并插入到hive托管/外部表中
  3. 查询Kafka主题的数据并推送到其他Kafka主题
  4. 查询来自hive external / managed table的数据并推送到Kafka主题

您正在尝试执行第三个用例。

首先为源和目标Kafka主题创建两个外部表。

create external table if not exists source_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='source_topic_name',
'kafka.bootstrap.servers'=''
);


create external table if not exists target_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='target_topic_name',
'kafka.bootstrap.servers'=''
);

然后使用合并查询将数据插入目标Kafka主题

merge into target_topic_table
using (
select
<columns>,
cast(null as binary) as `__key`,
cast(null as int) as `__partition`,
cast(-1 as bigint) as `__offset`,
cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
from source_topic_table
) sub
on
sub.column_name = target_topic_table.coulmn_name <Some condition>
when not matched then insert values
(
<sub.columns>,
sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
);

注意:

  1. 使用Hive外部非本机表
  2. 除了用户定义的有效负载模式之外,Kafka存储处理程序还附加了4个附加列(__ key,__ partition,__ offset,__ timestmap),用户可以使用这些列来查询Kafka元数据字段
  3. 如果数据不是csv格式,用户必须设置'kafka.serde.class'表属性
  4. 用户还可以设置'kafka.write.semantic'表属性,该属性允许NONE,AT_LEAST_ONCE或EXACTLY_ONCE值。

0
投票

如果我理解正确,你想要从Hive读取事件,然后推送到Kafka。我没有使用存储处理程序的经验,但我宁愿建议编写适当的代码来生成Kafka,然后将这些事件提供给Hadoop / Hive。

在Kafka中有一个名为Kafka Connect的框架,它写入外部系统.Confluent编写了这样一个HDFS连接器,通过在文件写入HDFS时更新Hive Metastore来提供Hive支持。

在不编写存储处理程序的情况下,您可以尝试使用JDBC Source连接器,或者使用Spark / Flink从Hive读取数据并推送到Kafka。

但一般来说,Hadoop是CDC事件的目的地,而不是它的生成源。主要是因为查询速度很慢...如果你想在插入上创建事件,它通常需要一些表扫描,所以从Cassandra / Hbase生成事件可能是更好的选择

© www.soinside.com 2019 - 2024. All rights reserved.