如何避免重复添加?

问题描述 投票:2回答:3

Problem I'm trying to solve

我有一个工作进程,通过累积工作人员收到的一些JSON消息的值来更改资源的属性(比如MyResource)。即使工作进程收到两次或多次相同的JSON消息,我也试图想出一种避免重复累积的最佳方法。

Here's what I've tried

解决方案1

每个JSON消息都有一个唯一的时间戳,它取决于创建JSON消息的时间,我在MyResource上保存了该时间戳,如果它的时间戳值低于MyResource上的时间戳值,则拒绝JSON消息。

Problem

由于整个体系结构是异步的,因此可以按任何顺序接收消息,而不一定按照创建它们的顺序接收消息。

解决方案2

我在MyResource上创建了一个新属性(比如added_ids)。每条JSON消息都有一个唯一的id,我将该id附加到MyResource.added_ids。并且每次为已经处理的JSON消息累积已使用的added_ids。

Problem

我正在使用mongo来存储MyResource。由于每个MyResource的JSON消息很多,因此每个MyResource文档都开始爆炸这个数组。在数组中查找也是一项昂贵的操作。

I'm looking for

我正在寻找一个可以处理异步性质的答案,也不会破坏我的mongo文档。另外,我不是在寻找一个精确的解决方案,是否有用于解决类似问题的算法/模式?我试过谷歌搜索,但我不知道该怎么称呼此问题以获得相关结果。

ruby mongodb asynchronous mongoid
3个回答
0
投票

我认为你的第二个解决方案是正确的,但是如果你将每个added_id存储为它自己的key-val而不是数组,性能可能会更好。

逻辑非常简单:每次从队列中获取输入时,在缓存中查找是否有该消息ID的条目。如果有条目,则不要累积该输入。否则,累积输入并将密钥存储在缓存中。

正如您所提到的,这种方法存在可伸缩性问题,因为缓存将无限增长。要解决此问题,您可以使用具有过期和逐出功能的缓存。最简单的方法是明确设置你写的每个键的“到期时间”。 Mongo,Memcached和Redis支持此功能。

问题是,即使您在每个点上设置“expires at”,如果负载足够,您的缓存仍会耗尽内存。所以你需要一个后备 - 当缓存内存不足时要做的事情。为此,您可以使用具有“自动逐出”功能的缓存,这意味着它具有在必要时删除内容的算法。

它看起来不像Mongo支持这样的东西(它是一个具有缓存功能的数据库而不是适当的缓存)。 Memcache使用LRU算法(参见https://github.com/memcached/memcached/wiki/UserInternals#when-are-items-evicted)。 Redis有多种算法可供选择(参见https://redis.io/topics/lru-cache)。

我要记住的另一件事是,在分布式或多线程应用程序上执行此整个过程会引入竞争条件。假设您有20台工作机器,无论出于何种原因,它们几乎在同一时间都能获得相同的消息。他们每个人都会检查缓存中的条目并且什么也没找到,因此没有一个被标记为重复。

要解决此问题,您可以使用互斥锁/信号量用于在同一台计算机上运行的多个线程(垂直扩展)或“分布式锁定”(如果您完全拥有多台计算机)(水平扩展)。见https://redis.io/topics/distlock

编辑

我收到一条提示,Mongo可以用Capped Collections进行自动驱逐。它只支持FIFO驱逐(始终首先使最旧的数据到期),这可能无论如何都能满足您的需求。


0
投票

如果您的JSON消息可以按任何顺序应用,那么时间戳方法似乎没有多大意义。对此问题的描述并不十分清楚 - 只需要确保避免重新处理相同的消息。

我在类似约束的系统上工作,我们采用的方法是关注消息而不是资源。方法是计算消息的MD5校验和(或至少是影响MyResource实例的关键部分......包括资源ID)。您将消息存储在mongoDB文档中,可能将整个消息作为一个属性存储,将MD5校验和存储在另一个属性中。当worker收到消息时,它会计算消息的校验和,检查消息是否已经收到,并且只处理消息(存储在mongoDB中,对MyResource实例执行操作),如果没有带有校验和的现有doc 。

这种方法的一个优点是,如果出于某种原因,“对MyResource采取行动”的内容将会失败,您可以在将来“回放”这些消息。您可能希望在收到文档时为文档添加时间戳以保证播放顺序(因为生产是异步的......您可能希望支持多个生产者......收据时间应该是王道)。


0
投票

让我分享另一个疯狂的解决方案如果您可以为每个资源消息分配唯一的素数,则可以识别重复项。在这种情况下,你必须在空间和时间之间进行权衡。

messages for MyResource 1 => message 2 | message 3 | message 5 | message 7  
messages for MyResource 2 => message 2 | message 3 | message 5 | message 7  

在每个过程之后,存储当前消息素数和先前计算的乘法。

MyResource 1 | 2  (processed 2 only)  
MyResource 2 | 70 (processed 2*5*7)

无论何时收到消息,都要验证现有值是否可以按消息ID划分。

70 % 5 == 0 true (already processed)
70 % 3 == 0 false (not processed)

在第二个选项中,您担心空间(MongoDB 15mb限制,插入/查找延迟),为此,您应该考虑节省空间的数据结构,如bloom filter。然而,它是一种概率数据结构,这意味着可能存在误报匹配,但假阴性则不然。你可以试试redis has a nice implementation

 127.0.0.1:6379> BF.ADD resource1 msg1
 (integer) 1
 127.0.0.1:6379> BF.EXISTS resource1 msg1
 (integer) 1
 127.0.0.1:6379> BF.EXISTS resource1 msg2
 (integer) 0

推荐问答