如何存储在Apache的风暴临时数据?
在风暴的拓扑结构,螺栓需要访问先前处理的数据。
Eg: if the bolt processes varaiable1 with result as 20 at 10:00 AM.
并再次varaiable1
被接收为50
在10:15 AM
那么结果应该是30 (50-20)
以后如果varaiable1接收70
那么结果应该在20 (70-50)
被10:30
。
如何实现这一功能。
总之,你想要做微配料计算,在风暴的运行的元组。首先,您需要定义/查找元组设置键。不要使用该密钥螺栓之间场分组(不使用随机分组)。这将保证相关的元组将始终发送到相同的关键下游螺栓相同的任务。定义类层次的集合列表/地图维持旧的价值观和在相同的新的价值进行计算,不用担心它们是线程相同的螺栓的不同执行者实例之间的安全。
恐怕没有这样的内置功能,从今天开始。但是你可以使用任何类型的分布式缓存,如分布式缓存或Redis的。这些缓存解决方案是很容易使用。
Uou可以使用CacheBuilder与自己的延伸BaseRichBolt内记住你的数据(把这个prepare方法):
// init your cache.
this.cache = CacheBuilder.newBuilder()
.maximumSize(maximumCacheSize)
.expireAfterWrite(expireAfterWrite, TimeUnit.SECONDS)
.build();
然后,在执行时,可以使用高速缓存,看看你是否已经看到,键盘输入或没有。从那里,你可以添加你的业务逻辑:
// if we haven't seen it before, we can emit it.
if(this.cache.getIfPresent(key) == null) {
cache.put(key, nearlyEmptyList);
this.collector.emit(input, input.getValues());
}
this.collector.ack(input);
这个问题是一个很好的候选人在微批次证明阿帕奇星火在内存中的计算。然而,你的使用情况是微不足道的风暴来实现。
1)确保螺栓使用字段分组。这将始终如一散列传入元组到相同的螺栓,所以我们不会失去任何元组。
2)保持在螺栓的本地缓存的地图。这张地图将保持一个“变量”的最后一个已知值。
class CumulativeDiffBolt extends InstrumentedBolt{
Map<String, Integer> lastKnownVariableValue;
@Override
public void prepare(){
this.lastKnownVariableValue = new HashMap<>();
....
@Override
public void instrumentedNextTuple(Tuple tuple, Collector collector){
.... extract variable from tuple
.... extract current value from tuple
Integer lastValue = lastKnownVariableValue.getOrDefault(variable, 0)
Integer newValue = currValue - lastValue
lastKnownVariableValue.put(variable, newValue)
emit(new Fields(variable, newValue));
...
}