在风暴分布式缓存

问题描述 投票:0回答:5

如何存储在Apache的风暴临时数据?

在风暴的拓扑结构,螺栓需要访问先前处理的数据。

Eg: if the bolt processes varaiable1 with result as 20 at 10:00 AM.

并再次varaiable1被接收为5010:15 AM那么结果应该是30 (50-20)

以后如果varaiable1接收70那么结果应该在20 (70-50)10:30

如何实现这一功能。

apache-storm
5个回答
2
投票

总之,你想要做微配料计算,在风暴的运行的元组。首先,您需要定义/查找元组设置键。不要使用该密钥螺栓之间场分组(不使用随机分组)。这将保证相关的元组将始终发送到相同的关键下游螺栓相同的任务。定义类层次的集合列表/地图维持旧的价值观和在相同的新的价值进行计算,不用担心它们是线程相同的螺栓的不同执行者实例之间的安全。


1
投票

恐怕没有这样的内置功能,从今天开始。但是你可以使用任何类型的分布式缓存,如分布式缓存或Redis的。这些缓存解决方案是很容易使用。


0
投票

有一对夫妇的方式做到这一点,但它取决于你的系统的要求,你的团队的技能和基础设施。

你可以使用Apache Cassandra的为你的事件存储和传递该行的元组键,以便在下一个螺栓可以检索它。

如果你的数据在本质上是时间序列,那么也许你想看看OpenTSDBInfluxDB

当然,你可能会回落到类似软件事务内存,但我认为这将需要各具特色的良好的数额。


0
投票

Uou可以使用CacheBuilder与自己的延伸BaseRichBolt内记住你的数据(把这个prepare方法):

// init your cache.
this.cache = CacheBuilder.newBuilder()
                         .maximumSize(maximumCacheSize)
                         .expireAfterWrite(expireAfterWrite, TimeUnit.SECONDS)
                         .build();

然后,在执行时,可以使用高速缓存,看看你是否已经看到,键盘输入或没有。从那里,你可以添加你的业务逻辑:

// if we haven't seen it before, we can emit it.
if(this.cache.getIfPresent(key) == null) {
    cache.put(key, nearlyEmptyList);
    this.collector.emit(input, input.getValues());
}

this.collector.ack(input);

0
投票

这个问题是一个很好的候选人在微批次证明阿帕奇星火在内存中的计算。然而,你的使用情况是微不足道的风暴来实现。

1)确保螺栓使用字段分组。这将始终如一散列传入元组到相同的螺栓,所以我们不会失去任何元组。

2)保持在螺栓的本地缓存的地图。这张地图将保持一个“变量”的最后一个已知值。

class CumulativeDiffBolt extends InstrumentedBolt{

Map<String, Integer> lastKnownVariableValue;

@Override
public void prepare(){
     this.lastKnownVariableValue = new HashMap<>();
     ....

@Override
public void instrumentedNextTuple(Tuple tuple, Collector collector){
     .... extract variable from tuple
     .... extract current value from tuple
     Integer lastValue = lastKnownVariableValue.getOrDefault(variable, 0)
     Integer newValue = currValue - lastValue

     lastKnownVariableValue.put(variable, newValue)
     emit(new Fields(variable, newValue));
   ...
}
© www.soinside.com 2019 - 2024. All rights reserved.