我对hive / hadoop比较新
我正在阅读这个Hive Storage Handlers。
现在我正在尝试编写HiveStorageHandler的自定义实现,以便使用Hive Table查询和推送消息给Kafka。
我看到HiveStorageHandler还有其他实现,它允许我们使用hive表查询和编写NoSQL数据库。
我想为卡夫卡复制一下。我发现了一个项目
HiveKa - query Kafka using Hive
他们在这里尝试使用hive表上的查询从Kafka读取数据。我希望使用表格中的insert来写kafka主题。
有人可以指导我吗?
我希望使用表格中的insert来写kafka主题。
使用Kafka HiveStorageHandler可以实现这一点。以下是此功能可能的一般用例
您正在尝试执行第三个用例。
首先为源和目标Kafka主题创建两个外部表。
create external table if not exists source_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='source_topic_name',
'kafka.bootstrap.servers'=''
);
create external table if not exists target_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='target_topic_name',
'kafka.bootstrap.servers'=''
);
然后使用合并查询将数据插入目标Kafka主题
merge into target_topic_table
using (
select
<columns>,
cast(null as binary) as `__key`,
cast(null as int) as `__partition`,
cast(-1 as bigint) as `__offset`,
cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
from source_topic_table
) sub
on
sub.column_name = target_topic_table.coulmn_name <Some condition>
when not matched then insert values
(
<sub.columns>,
sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
);
注意:
如果我理解正确,你想要从Hive读取事件,然后推送到Kafka。我没有使用存储处理程序的经验,但我宁愿建议编写适当的代码来生成Kafka,然后将这些事件提供给Hadoop / Hive。
在Kafka中有一个名为Kafka Connect的框架,它写入外部系统.Confluent编写了这样一个HDFS连接器,通过在文件写入HDFS时更新Hive Metastore来提供Hive支持。
在不编写存储处理程序的情况下,您可以尝试使用JDBC Source连接器,或者使用Spark / Flink从Hive读取数据并推送到Kafka。
但一般来说,Hadoop是CDC事件的目的地,而不是它的生成源。主要是因为查询速度很慢...如果你想在插入上创建事件,它通常需要一些表扫描,所以从Cassandra / Hbase生成事件可能是更好的选择