对于我目前的开发我有很多线程(Producers
)创建Tasks
和许多线程消耗这些Tasks
(consumers
)
每个Producers
都有一个独特的名称; Tasks
由以下部分组成:
Producers
我的问题涉及(Producers
)和(consumers
)使用的数据结构。
天真地,我们可以想象Producers
用Tasks
填充并发队列,并且(consumers
)读取/消耗存储在并发队列中的Tasks
。
我认为这个解决方案相当规模,但是一个案例是有问题的:如果Producers
非常快地创建了两个具有相同名称但不相同数据的Tasks
(任务T1和T2具有相同的名称,但T1具有数据D1和T2有数据D2),理论上可能它们按T2然后T1的顺序消耗!
现在,我想基于Map + Queue创建我自己的数据结构(比方说MyQueue
)。比如队列,它会有pop()
和push()
方法。
pop()
方法非常简单push()
方法将:
检查Task
中是否还没有插入现有的MyQueue
(在地图中执行find()
)
如果找到:存储在要插入的Task
中的数据将与存储在找到的Task
中的数据合并
如果没有找到:将在地图中插入Task
,并在队列中添加一个条目当然,我必须安全地进行并发访问......这肯定是我的问题;我几乎可以肯定这个解决方案不会扩展。
所以我现在的问题是,为了满足我的要求,我必须使用哪种最佳数据结构
你可以试试Heinz Kabutz的Striped Executor Service可能的候选人。
这个神奇的线程池将确保具有相同stripeClass的所有Runnables将按照它们提交的顺序执行,但具有不同stripedClasses的StripedRunners仍然可以独立执行。
为什么不选择并发并进行并行,而不是使数据结构对并发访问安全?
诸如MapReduce之类的功能编程模型是解决此类问题的一种非常可扩展的方法。
据我所知,D1
和D2
既可以一起分析,也可以单独分析,唯一的限制是它们不应该以错误的顺序进行分析。 (在这里做一些假设)但是如果真正的问题只是结果的组合方式,那么可能有一个简单的解决方案。
您可以一起删除约束,允许它们单独分析,然后具有能够以合理的方式将它们重新组合在一起的reduce函数。
在这种情况下,你的第一步是map
,第二步是reduce
。
即使计算在单次操作中更有效,但扩展的很大一部分,特别是扩展是由denormalization完成的。
如果消费者并行运行,我怀疑是否有办法让他们按顺序执行具有相同名称的任务。在您的示例中(来自评论):
如果生产者“P1”添加带有数据D1的第一个任务“T”并且快速添加带有数据D2的第二个任务“T”,则BlockingQueue确实是一个问题(不幸的是)。在这种情况下,第一个任务可以由一个线程处理,第二个任务可以由另一个线程处理;如果处理第一个任务的线程被中断,则处理第二个任务的线程可以先完成
如果P1没有那么快地提交D2,则没有区别。消费者1可能仍然太慢,因此消费者2将能够首先完成。以下是此类方案的示例:
要解决这个问题,你必须引入某种完成检测,我认为这会使事情过于复杂。
如果您有足够的负载并且可以按顺序处理具有不同名称的某些任务,则可以为每个使用者使用一个队列,并将相同的命名任务放入同一队列。
public class ParallelQueue {
private final BlockingQueue<Task>[] queues;
private final int consumersCount;
public ParallelQueue(int consumersCount) {
this.consumersCount = consumersCount;
queues = new BlockingQueue[consumersCount];
for (int i = 0; i < consumersCount; i++) {
queues[i] = new LinkedBlockingQueue<>();
}
}
public void push(Task<?> task) {
int index = task.name.hashCode() % consumersCount;
queues[index].add(task);
}
public Task<?> pop(int consumerId) throws InterruptedException {
int index = consumerId % consumersCount;
return queues[index].take();
}
private final static class Task<T> {
private final String name;
private final T data;
private Task(String name, T data) {
this.name = name;
this.data = data;
}
}
}