Storm + Kafka没有像预期的那样并行化

问题描述 投票:0回答:1

我们遇到的问题是单个拓扑中的任务并行性。我们无法获得良好,流畅的处理率。

我们正在使用Kafka和Storm来构建具有不同拓扑的系统,其中数据是在使用Kafka主题连接的一系列拓扑之后处理的。

我们正在使用Kafka 1.0.0和Storm 1.2.1。

负载的消息数量很少,大约每天2000个,但每个任务可能需要相当长的时间。特别是一种拓扑结构可能花费不同的时间来处理每个任务,通常在1到20分钟之间。如果按顺序处理,则吞吐量不足以处理所有传入消息。所有拓扑和Kafka系统都安装在一台机器上(16个内核,16 GB RAM)。

由于消息是独立的并且可以并行处理,因此我们尝试使用Storm并发功能来提高吞吐量。

为此,拓扑已配置如下:

  • 4名工人
  • 并行性提示设置为10
  • 从Kafka读取时的消息大小足以读取每条消息中的8个任务。
  • Kafka主题使用replication-factor = 1和partitions = 10。

使用此配置,我们在此拓扑中观察到以下行为。

  • 通过Storm拓扑从Kafka一次读取大约7-8个任务(任务大小不固定),最大消息大小为128 kB。
  • 同时计算约4-5个任务。工作在工人中或多或少均匀分布。一些工人承担1项任务,其他人承担2项并同时处理。
  • 随着任务的完成,剩下的任务就开始了。
  • 当只剩下1-2个任务需要处理时,就会遇到饥饿问题。其他工作人员等待闲置,直到所有任务完成。
  • 完成所有任务后,将确认消息并将其发送到下一个拓扑。
  • 从Kafka读取新批次,然后再次开始该过程。

我们有两个主要问题。首先,即使有4名工人和10名并行性提示,也只开始了4-5个任务。其次,即使只有1个任务,也有工作待处理时不再启动批处理。

这不是没有足够的工作要做的问题,因为我们在开始时尝试插入2000个任务,因此有很多工作要做。

我们试图增加参数“maxSpoutsPending”,期望拓扑会读取更多批次并同时排队,但似乎它们在内部被流水线化,而不是同时处理。

使用以下代码创建拓扑:

private static StormTopology buildTopologyOD() {
    //This is the marker interface BrokerHosts.
    BrokerHosts hosts = new ZkHosts(configuration.getProperty(ZKHOSTS));
    TridentKafkaConfig tridentConfigCorrelation = new TridentKafkaConfig(hosts, configuration.getProperty(TOPIC_FROM_CORRELATOR_NAME));

    tridentConfigCorrelation.scheme = new RawMultiScheme();
    tridentConfigCorrelation.fetchSizeBytes = Integer.parseInt(configuration.getProperty(MAX_SIZE_BYTES_CORRELATED_STREAM));

    OpaqueTridentKafkaSpout spoutCorrelator = new OpaqueTridentKafkaSpout(tridentConfigCorrelation);

    TridentTopology topology = new TridentTopology();

    Stream existingObject = topology.newStream("kafka_spout_od1", spoutCorrelator)
            .shuffle()
            .each(new Fields("bytes"), new ProcessTask(), new Fields(RESULT_FIELD, OBJECT_FIELD))
            .parallelismHint(Integer.parseInt(configuration.getProperty(PARALLELISM_HINT)));

    //Create a state Factory to produce outputs to kafka topics.
    TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
            .withProducerProperties(kafkaProperties)
            .withKafkaTopicSelector(new ODTopicSelector())
            .withTridentTupleToKafkaMapper(new ODTupleToKafkaMapper());

    existingObject.partitionPersist(stateFactory, new Fields(RESULT_FIELD, OBJECT_FIELD), new TridentKafkaUpdater(), new Fields(OBJECT_FIELD));

    return topology.build();
}

和配置创建为:

private static Config createConfig(boolean local) {
    Config conf = new Config();
    conf.setMaxSpoutPending(1); // Also tried 2..6
    conf.setNumWorkers(4);

    return conf;
}

我们可以采取哪些措施来提高性能,既可以增加并行任务的数量,也可以在完成处理批次时避免饥饿?

apache-kafka apache-storm
1个回答
0
投票

我在Nathan Marz的风暴用户身上找到了关于为Trident设置并行性的old post

我建议使用“名称”功能来命名部分流,以便UI显示哪些螺栓对应于哪些部分。

Trident将操作包装成尽可能少的螺栓。此外,除非您已完成明确涉及重新分区的操作(例如shuffle,groupBy,partitionBy,全局聚合等),否则它永远不会对您的流进行重新分区。 Trident的这个属性确保您可以控制事物处理的排序/半顺序。所以在这种情况下,groupBy之前的所有内容都必须具有相同的并行性,否则Trident必须重新分配流。既然你没有说你想要重新分配流,它就不能那样做。通过引入重新分区操作,您可以为spout与每个spout获得不同的并行性,如下所示:

stream.parallelismHint(1).shuffle()各(...)。每个(...).parallelismHint(3).groupBy(...)。

我想你可能想为你的鲸鱼喷水器和你的.each设置parallelismHint。

关于同时处理多个批次,你是对的,这就是maxSpoutPending在Trident中的用途。尝试在Storm UI中检查您的最大喷口挂起值是否实际被拾取。还尝试为MasterBatchCoordinator启用调试日志记录。您应该能够从该日志记录中判断出多个批次是否同时处于飞行状态。

当你说多个批次没有同时处理时,你的意思是ProcessTask吗?请记住,Trident的一个属性是关于批次的状态更新。如果您有例如maxSpoutPending = 3并且批次1,2和3在飞行中,Trident将不会发出更多批次进行处理,直到写入批次1,此时它将再发出一次。如此慢的批次可以阻止更多的发射,即使2和3完全处理,他们必须等待1完成并写入。

如果您不需要Trident的批处理和订购行为,您可以尝试常规的Storm。

更多的旁注,但你可能想考虑从storm-kafka迁移到storm-kafka-client。这对于这个问题并不重要,但如果不这样做,你将无法升级到Kafka 2.x,而且在你获得一堆状态进行迁移之前,它会更容易。

© www.soinside.com 2019 - 2024. All rights reserved.