我在java中使用Kafka,我把JSON消息作为字符串消耗,速率是每分钟100万条消息,我需要把字符串拆分只取一些值并保存到CSV中加载到数据库中,如何才能做出这样的东西?
你可以直接将Kafka主题中的数据流到数据库中,使用的是 Kafka Connect JDBC sink. 要了解更多关于Kafka Connect的信息,请看 文献 和 此话.
这是一个水槽连接器配置的例子。
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"topics": "test01",
"connection.user": "connect_user",
"connection.password": "asgard",
"auto.create": true,
"auto.evolve": true,
"insert.mode": "insert",
"pk.mode": "record_key",
"pk.fields": "MESSAGE_KEY"
}
在本教程中了解更多。https:/rmoff.devkafka-jdbc-video。
了解如何在Kafka Connect中安装JDBC驱动程序。此处