风暴拓扑配置

问题描述 投票:14回答:7

如何为风暴拓扑提供自定义配置?例如,如果我构建了一个连接到MySQL集群的拓扑,并且我希望能够在不重新编译的情况下更改我需要连接的服务器,那么我该怎么做?我的偏好是使用配置文件,但我担心的是文件本身没有部署到集群,因此它不会被运行(除非我对集群如何工作的理解是有缺陷的)。到目前为止,我在运行时将配置选项传递到风暴拓扑中的唯一方法是通过命令行参数,但是当您获得大量参数时,这很麻烦。

一个想法确实是利用shell脚本将文件读入变量并将该变量的内容作为字符串传递给拓扑,但是如果可能的话我想要一些更清洁的东西。

有人遇到过这种情况么?如果是这样,你是如何解决的?

编辑:

似乎需要提供更多说明。我的方案是我有一个拓扑,我希望能够在不同的环境中部署,而无需重新编译它。通常情况下,我会创建一个配置文件,其中包含数据库连接参数之类的内容并将其传入。我想知道如何在Storm中执行类似的操作。

apache-storm
7个回答
7
投票

您可以指定使用拓扑提交的配置(通常通过yaml文件)。我们如何在自己的项目中自行管理,我们有单独的配置文件用于开发,一个用于生产,在其中我们存储我们的服务器,redis和db IP以及Ports等。然后当我们运行命令来构建jar并提交暴露它的拓扑包括正确的配置文件,具体取决于您的部署环境。螺栓和喷口只是从stormConf图中读取它们所需的配置,它将在bolt的prepare()方法中传递给它们。

来自http://storm.apache.org/documentation/Configuration.html

每个配置都有一个默认值,在Storm代码库中的defaults.yaml中定义。您可以通过在Nimbus的类路径和主管中定义storm.yaml来覆盖这些配置。最后,您可以定义在使用StormSubmitter时随拓扑一起提交的特定于拓扑的配置。但是,拓扑特定的配置只能覆盖前缀为“TOPOLOGY”的配置。

Storm 0.7.0及更高版本允许您在每个螺栓/每个喷嘴的基础上覆盖配置。

你还会在http://nathanmarz.github.io/storm/doc/backtype/storm/StormSubmitter.html上看到submitJar和submitTopology传递了一个名为conf的地图。

希望这能让你开始。


2
投票

我通过在代码中提供配置解决了这个问题:

config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, SOME_OPTS);

我试图提供特定于拓扑的storm.yaml,但它不起作用。如果你让它使用storm.yaml,请纠正我。

更新: 对于想知道SOME_OPTS是什么的人来说,这是来自Storm邮件列表上的Satish Duggana:

Config.TOPOLOGY_WORKER_CHILDOPTS:可以覆盖拓扑的WORKER_CHILDOPTS的选项。您可以配置任何java选项,如内存,gc等

在你的情况下它可以

config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx1g");

1
投票

实际上最适合您的是将配置存储在可变键值存储(s3,redis等)中,然后将其拉入以配置您随后使用的数据库连接(我假设您已经计划限制如何经常你和数据库交谈,这样获得这个配置的开销就不是什么大问题了。此设计允许您即时更改数据库连接,甚至无需重新部署拓扑。


0
投票

我们的想法是,在构建拓扑时,您可以创建spout和bolt的实例(以及其他内容),并将这些实例序列化并分发到集群中的正确位置。如果要配置喷口或螺栓的行为,则在提交之前创建拓扑时执行此操作,并通过在螺栓或喷口上设置实例变量来执行此操作,这反过来会驱动所需的可配置行为。


0
投票

我也遇到了同样的问题。我通过在我的集群中配置NFS解决了这个问题,并将配置文件放在共享位置,以便它可用于所有集群机器。在Linux系统link中配置NFS非常容易。


0
投票

我遇到了和你一样的问题,这是我棘手的解决方案:

使用一个简单的java文件作为配置文件,比如topo_config.java,它看起来像:

package com.xxx
public class topo_config {
    public static String zk_host = "192.168.10.60:2181";
    public static String kafka_topic = "my_log_topic";
    public static int worker_num = 2;
    public static int log_spout_num = 4;
    // ...
}

这个文件放在我的configure文件夹中,然后编写一个脚本,比如compile.sh,它会将它复制到正确的包中并进行编译,看起来像:

cp config/topo_config.java src/main/java/com/xxx/
mvn package

配置直接实现:

Config conf = new Config();
conf.setNumWorkers(topo_config.worker_num);

0
投票

我们已经看到了同样的问题,并通过添加以下每个拓扑来解决它

config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx4096m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true");

还使用Nimbus UI进行了验证,如下所示。

topology.worker.childopts   -Xmx4096m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true
© www.soinside.com 2019 - 2024. All rights reserved.