在这个最小的示例中,为什么Trident不调用ack()或fail()?

问题描述 投票:0回答:1

我试图在Trident中创建一个小示例。目的是查看在失败情况下如何重播元组。下面是拓扑定义

        Random rand = new Random();

        Config config = new Config();
        config.setDebug(true);
        config.setNumWorkers(1);

        TridentTopology topology = new TridentTopology();

        topology.newStream("spout", new RandomIntegerSpout())
                .map((MapFunction) tridentTuple -> {
                    if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
                            (rand.nextInt(2) == 1)) {
                        System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
                        throw new ReportedFailedException("Divisible by 50");
                    }
                    return new Values(tridentTuple.toArray());
                })
                .peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));

我使用来自Storm-starter的RandomIntegerSpout,该扩展程序扩展了BaseRichSpout并仅生成随机数。然后,我应用一个MapFunction,它每50个元组仅绘制一个随机数,并随机使该元组失败。

问题是,我没有得到任何ackfail

我玩着水嘴,并在调试模式下运行它,尝试了相同的示例输出,并使用标准的防风栓进行了尝试。锚定工作正常,只是不会被三叉戟调用。

我在v1.2.3和v2.0.0中使用LocalCluster和StormSubmitter复制了此问题。

以下是Storm UI的屏幕截图:enter image description here对应于地图确认的螺栓将使元组失效,但不会被传播回喷嘴。

我以为三叉戟Mastercoord可能期望某种状态的持久性来实现拓扑,但是用某些persistentAggregate代替peek并没有帮助。通过对map进行同样的操作,我还排除了each中的错误。

通过检查来看代码几乎是微不足道的,我可能会误解有关Trident / Storm的一些基本知识。如果批处理完成了,我期望三叉戟会调用出水嘴的ack方法是否是错误的?我意识到fail中没有IBatchSpout方法。 Trident如何处理批次的重放?

apache-storm trident
1个回答
0
投票

三叉嘴不会在单个元组级别确认或失败元组。相反,元组将被批处理。

三叉嘴通常看起来像this interface

M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);

想法是,Trident将设法跟踪批处理元组的ack / fail,然后如果批处理失败,它将要求喷嘴重复该批处理,如果没有,则根本不会。

请注意,这与标准Storm Spout有何不同。对于普通的喷口,框架基本上会告诉喷口“嘿,发出一些东西。由您自己决定发出什么。”,然后使用ackfail方法来告诉喷口是否应该发出特定的元组。再次。

使用Trident时,喷口将被告知“嘿,(重新)发出批次编号x”,然后由喷口决定该批次中有哪些元组。使用此模型,不需要fail方法。不过,某些Trident喷口将具有ack/succeed方法,以允许喷口丢弃它可能与特定正在进行的批处理有关的任何状态。

对于包装的IRichSpouts,有一些bridging code会将它们包装到Trident API中。基本上,包装器将调用nextTuple直到具有完整批次,然后将ID存储在缓存中。如果要求包装器重新批处理,它将在喷嘴上调用fail。否则,一旦批处理成功,它将调用ack

[我认为您没有在Storm UI中看到与此相关的任何内容的原因是IRichBolt实际上并未在那里显示。相反,它是包装的,因此ack/fail调用是在spout-spout组件内部“在幕后”进行的。如果您想确定是否正在调用确认/失败,请尝试向ack/failIRichSpout方法中添加一些日志记录。

© www.soinside.com 2019 - 2024. All rights reserved.