为Kafka创建Spring Cloud Data Flow主题

问题描述 投票:0回答:2

我有自己的内部带有Python的Spring Cloud Data Flow处理器,我将此示例用作指导:https://dataflow.spring.io/docs/recipes/polyglot/processor/。然后,我想缩放并创建其中三个处理器,因此使用spring.cloud.deployer.myApp.count=3创建了3个内含Python的容器。我对示例中的代码进行了一些修改:创建Kafka使用者时,我还传递了一个组ID,因此消息应该负载均衡。

consumer = KafkaConsumer(get_input_channel(), group_id=get_consumer_group(), bootstrap_servers=[get_kafka_binder_brokers()])

问题是,SCDF创建的Kafka主题只有一个分区,因此消息仅到达一个pod。所以我想知道:

  • 我应该以某种方式配置SCDF以创建具有3个分区的Kafka主题吗?
  • 或者我不应该依靠SCDF并用Python自己创建主题吗?我想这将是多余的,因为SCDF也会创建此主题。
  • SCDF中的哪个组件实际上负责创建Kafka主题?我如何影响分区数量?
  • 如果我停止该流并以4个处理器步骤再次启动,则该主题是否应使用第4个分区扩展?因为当前没有新分区被创建。
python spring kubernetes apache-kafka spring-cloud-dataflow
2个回答
1
投票

[请花一点时间查看Spring Cloud Data Flow的responsibilities。如果不清楚,SCDF不会与支持消息传递中间件(如Kafka)进行交互,也不会在运行时使用它。换句话说,SCDF不会创建与其关联的主题或分区,而只是自动配置Spring Cloud Stream(SCSt)属性。

但是,如果您在自定义处理器中使用SCSt,则该框架会自动将所需渠道绑定到中间件中的基础主题。该框架还具有更改分区行为的功能。您也可以使用分区过度的主题来部署处理器。有several other configuration options建立所需的流数据处理行为。

您正在查看的Python示例并不具有SCSt提供的所有功能。该配方是一个示例演练,演示了如何有人可以在Python中构建本机处理器样式的应用程序,其中在Python代码本身中手动创建了生产者和使用者配置。 SCDF和SCSt都不影响此配方中的应用程序行为。

我应该以某种方式配置SCDF以创建具有3个分区的Kafka主题吗?

如前所述,SCDF不与Kafka进行交互。

或者我不应该依靠SCDF并用Python自己创建主题吗?我想这将是多余的,因为SCDF也会创建此主题。

如果您的自定义处理器不是Spring Cloud Stream应用程序,是的,您有责任在代码中明确定义主题和分区。

SCDF中的哪个组件实际上负责创建Kafka主题?我如何影响分区数量?

Spring Cloud Stream。请参阅上面的说明。

如果我停止该流并以4个处理器步骤再次启动,则该主题是否应使用第4个分区扩展?因为当前没有新分区被创建。

您不一定需要重新启动流数据管道。如果您的主题是预先分区过度的,是的,运行时任何其他使用者都应该能够自动参与竞争的使用者关系。请留意spring-io/dataflow.spring.io#156-我们正在添加一个配方,以演示使用SCSt + SCDF + Kafka进行手动和自动缩放的可能性。


0
投票

谢谢,通过将以下代码引入Python容器启动脚本,并改进了https://dataflow.spring.io/docs/recipes/polyglot/processor/中提供的代码,可以解决此问题。使用SCDF服务器传递的参数来获取代理URL,主题名称,实例数:

admin_client = KafkaAdminClient(bootstrap_servers=[get_kafka_binder_brokers()], client_id=sys.argv[0])

partition_count = get_cmd_arg("spring.cloud.stream.instanceCount")

# create Kafka topic if does not exist
new_topic = NewTopic(name=get_input_channel(), num_partitions=partition_count, replication_factor=1)
try:
    admin_client.create_topics(new_topics=[new_topic])
except TopicAlreadyExistsError:
    logging.info(f"Topic {get_input_channel()} was already created")

# add Kafka partitions to existing topic
new_partitions = NewPartitions(total_count=partition_count)
try:
    admin_client.create_partitions(topic_partitions={get_input_channel(): new_partitions})
except InvalidPartitionsError as exp:
    logging.info(f"No need to increase Kafka partitions for topic {get_input_channel()}")
© www.soinside.com 2019 - 2024. All rights reserved.