我们遇到的问题是单个拓扑中的任务并行性。我们无法获得良好,流畅的处理率。
我们正在使用Kafka和Storm来构建具有不同拓扑的系统,其中数据是在使用Kafka主题连接的一系列拓扑之后处理的。
我们正在使用Kafka 1.0.0和Storm 1.2.1。
负载的消息数量很少,大约每天2000个,但每个任务可能需要相当长的时间。特别是一种拓扑结构可能花费不同的时间来处理每个任务,通常在1到20分钟之间。如果按顺序处理,则吞吐量不足以处理所有传入消息。所有拓扑和Kafka系统都安装在一台机器上(16个内核,16 GB RAM)。
由于消息是独立的并且可以并行处理,因此我们尝试使用Storm并发功能来提高吞吐量。
为此,拓扑已配置如下:
使用此配置,我们在此拓扑中观察到以下行为。
我们有两个主要问题。首先,即使有4名工人和10名并行性提示,也只开始了4-5个任务。其次,即使只有1个任务,也有工作待处理时不再启动批处理。
这不是没有足够的工作要做的问题,因为我们在开始时尝试插入2000个任务,因此有很多工作要做。
我们试图增加参数“maxSpoutsPending”,期望拓扑会读取更多批次并同时排队,但似乎它们在内部被流水线化,而不是同时处理。
使用以下代码创建拓扑:
private static StormTopology buildTopologyOD() {
//This is the marker interface BrokerHosts.
BrokerHosts hosts = new ZkHosts(configuration.getProperty(ZKHOSTS));
TridentKafkaConfig tridentConfigCorrelation = new TridentKafkaConfig(hosts, configuration.getProperty(TOPIC_FROM_CORRELATOR_NAME));
tridentConfigCorrelation.scheme = new RawMultiScheme();
tridentConfigCorrelation.fetchSizeBytes = Integer.parseInt(configuration.getProperty(MAX_SIZE_BYTES_CORRELATED_STREAM));
OpaqueTridentKafkaSpout spoutCorrelator = new OpaqueTridentKafkaSpout(tridentConfigCorrelation);
TridentTopology topology = new TridentTopology();
Stream existingObject = topology.newStream("kafka_spout_od1", spoutCorrelator)
.shuffle()
.each(new Fields("bytes"), new ProcessTask(), new Fields(RESULT_FIELD, OBJECT_FIELD))
.parallelismHint(Integer.parseInt(configuration.getProperty(PARALLELISM_HINT)));
//Create a state Factory to produce outputs to kafka topics.
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(kafkaProperties)
.withKafkaTopicSelector(new ODTopicSelector())
.withTridentTupleToKafkaMapper(new ODTupleToKafkaMapper());
existingObject.partitionPersist(stateFactory, new Fields(RESULT_FIELD, OBJECT_FIELD), new TridentKafkaUpdater(), new Fields(OBJECT_FIELD));
return topology.build();
}
和配置创建为:
private static Config createConfig(boolean local) {
Config conf = new Config();
conf.setMaxSpoutPending(1); // Also tried 2..6
conf.setNumWorkers(4);
return conf;
}
我们可以采取哪些措施来提高性能,既可以增加并行任务的数量,也可以在完成处理批次时避免饥饿?
我在Nathan Marz的风暴用户身上找到了关于为Trident设置并行性的old post:
我建议使用“名称”功能来命名部分流,以便UI显示哪些螺栓对应于哪些部分。
Trident将操作包装成尽可能少的螺栓。此外,除非您已完成明确涉及重新分区的操作(例如shuffle,groupBy,partitionBy,全局聚合等),否则它永远不会对您的流进行重新分区。 Trident的这个属性确保您可以控制事物处理的排序/半顺序。所以在这种情况下,groupBy之前的所有内容都必须具有相同的并行性,否则Trident必须重新分配流。既然你没有说你想要重新分配流,它就不能那样做。通过引入重新分区操作,您可以为spout与每个spout获得不同的并行性,如下所示:
stream.parallelismHint(1).shuffle()各(...)。每个(...).parallelismHint(3).groupBy(...)。
我想你可能想为你的鲸鱼喷水器和你的.each
设置parallelismHint。
关于同时处理多个批次,你是对的,这就是maxSpoutPending
在Trident中的用途。尝试在Storm UI中检查您的最大喷口挂起值是否实际被拾取。还尝试为MasterBatchCoordinator启用调试日志记录。您应该能够从该日志记录中判断出多个批次是否同时处于飞行状态。
当你说多个批次没有同时处理时,你的意思是ProcessTask吗?请记住,Trident的一个属性是关于批次的状态更新。如果您有例如maxSpoutPending = 3并且批次1,2和3在飞行中,Trident将不会发出更多批次进行处理,直到写入批次1,此时它将再发出一次。如此慢的批次可以阻止更多的发射,即使2和3完全处理,他们必须等待1完成并写入。
如果您不需要Trident的批处理和订购行为,您可以尝试常规的Storm。
更多的旁注,但你可能想考虑从storm-kafka
迁移到storm-kafka-client
。这对于这个问题并不重要,但如果不这样做,你将无法升级到Kafka 2.x,而且在你获得一堆状态进行迁移之前,它会更容易。