无法在KSql中连接外部主题

问题描述 投票:1回答:2

我对Confluent KSql很新,但对Kafka来说并不陌生。我将Kafka中存在的主题作为Avro序列化数据。我已启动并运行Confluent模式注册表并将KSql配置为指向注册表。

当我尝试创建一个基于我的主题之一的表时,KSql抱怨它无法找到流。当我尝试在KSql中创建一个简单地在KSql中传输我的主题的流时,似乎没有办法指向我在注册表中有引用的Avro序列化主题。

有谁知道如何攻击这两个问题?我想用KSql的方式不适合它能做什么?

UPDATE

这里有更多细节

ksql> show topics;

 Kafka Topic                                                                                 | Registered | Partitions | Partition Replicas | Consumers | Consumer Groups
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA                              | false      | 2          | 2                  | 0         | 0
 COM_FINDOLOGY_MODEL_TRAFFIC_CPATRACKINGCALLBACK                                             | false      | 2          | 2                  | 0         | 0
 COM_FINDOLOGY_MODEL_TRAFFIC_ENTRYPOINTCLICK                                                 | true       | 10         | 3                  | 0         | 0

KSql配置

#bootstrap.servers=localhost:9092
bootstrap.servers=host1:9092,host2:9092,host3:9092,host4:9092,host5:9092

#listeners=http://localhost:8088
listeners=http://localhost:59093

ksql.server.ui.enabled=true

ksql.schema.registry.url=http://host1:59092

注册表配置

# The host name advertised in ZooKeeper. Make sure to set this if running Schema Registry with multiple nodes.
host.name: x.x.x.x
listeners=http://0.0.0.0:59092

# Zookeeper connection string for the Zookeeper cluster used by your Kafka cluster
# (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
#kafkastore.connection.url=localhost:2181

# Alternatively, Schema Registry can now operate without Zookeeper, handling all coordination via
# Kafka brokers. Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the master schema registry instance and for storing the data for
# registered schemas.
# (Note that you cannot mix the two modes; use this mode only on new deployments or by shutting down
# all instances, switching to the new configuration, and then starting the schema registry
# instances again.)
kafkastore.bootstrap.servers=PLAINTEXT://host1:9092,PLAINTEXT://host2:9092,PLAINTEXT://host3:9092,PLAINTEXT://host4:9092,PLAINTEXT://host5:9092

# The name of the topic to store schemas in
kafkastore.topic=_schemas

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false

尝试通过声明外部主题来解决问题

ksql> register  topic xxx with (value_format='avro', kafka_topic='COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA');
You need to provide avro schema file path for topics in avro format.
apache-kafka confluent ksql
2个回答
0
投票

REGISTER TOPIC是不推荐使用的语法。您应该使用CREATE STREAM(或CREATE TABLE,具体取决于您的数据访问要求)。

所以你的陈述看起来像这样:

CREATE STREAM MY_STREAM_1 \
  WITH (VALUE_FORMAT='AVRO', \
  KAFKA_TOPIC='COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA');

请注意,我使用\打破线条以便于阅读;你不必这样做。


0
投票

在使用Kafka主题更改了我使用的信息之后,我解决了我遇到的问题,而不是使用整个主题内容。该主题包含使用ReflectionData创建的Avro编码数据(ok)。 KSql在处理流中的非标准项时遇到问题,但只要存在相应的KSql数据类型,就会处理ReflectionData项。我通过在KSql中创建一个新流来解决这个问题,该流只选择了我需要的与KSql兼容的项目。完成后,我可以从更大的流中处理我需要的东西。

评论我认为它在KSql中有点不足,你必须在Kafka中创建新的实际中介主题来处理数据。我认为更好的解决方案是将中间流作为View处理为实际流。在我将其解析为KTable之前,需要中间主题来保存累积和处理过的项目。

© www.soinside.com 2019 - 2024. All rights reserved.