Java中的并发和可伸缩数据结构来处理任务?

问题描述 投票:6回答:3

对于我目前的开发我有很多线程(Producers)创建Tasks和许多线程消耗这些Tasksconsumers

每个Producers都有一个独特的名称; Tasks由以下部分组成:

  • 它的名字Producers
  • 一个名字
  • 数据

我的问题涉及(Producers)和(consumers)使用的数据结构。

Concurrent Queue?

天真地,我们可以想象ProducersTasks填充并发队列,并且(consumers)读取/消耗存储在并发队列中的Tasks

我认为这个解决方案相当规模,但是一个案例是有问题的:如果Producers非常快地创建了两个具有相同名称但不相同数据的Tasks(任务T1和T2具有相同的名称,但T1具有数据D1和T2有数据D2),理论上可能它们按T2然后T1的顺序消耗!

Task Map + Queue?

现在,我想基于Map + Queue创建我自己的数据结构(比方说MyQueue)。比如队列,它会有pop()push()方法。

  • pop()方法非常简单
  • push()方法将: 检查Task中是否还没有插入现有的MyQueue(在地图中执行find()) 如果找到:存储在要插入的Task中的数据将与存储在找到的Task中的数据合并 如果没有找到:将在地图中插入Task,并在队列中添加一个条目

当然,我必须安全地进行并发访问......这肯定是我的问题;我几乎可以肯定这个解决方案不会扩展。

So What?

所以我现在的问题是,为了满足我的要求,我必须使用哪种最佳数据结构

java multithreading data-structures concurrency scalability
3个回答
2
投票

你可以试试Heinz Kabutz的Striped Executor Service可能的候选人。

这个神奇的线程池将确保具有相同stripeClass的所有Runnables将按照它们提交的顺序执行,但具有不同stripedClasses的StripedRunners仍然可以独立执行。


0
投票

为什么不选择并发并进行并行,而不是使数据结构对并发访问安全?

诸如MapReduce之类的功能编程模型是解决此类问题的一种非常可扩展的方法。

据我所知,D1D2既可以一起分析,也可以单独分析,唯一的限制是它们不应该以错误的顺序进行分析。 (在这里做一些假设)但是如果真正的问题只是结果的组合方式,那么可能有一个简单的解决方案。

您可以一起删除约束,允许它们单独分析,然后具有能够以合理的方式将它们重新组合在一起的reduce函数。

在这种情况下,你的第一步是map,第二步是reduce

即使计算在单次操作中更有效,但扩展的很大一部分,特别是扩展是由denormalization完成的。


0
投票

如果消费者并行运行,我怀疑是否有办法让他们按顺序执行具有相同名称的任务。在您的示例中(来自评论):

如果生产者“P1”添加带有数据D1的第一个任务“T”并且快速添加带有数据D2的第二个任务“T”,则BlockingQueue确实是一个问题(不幸的是)。在这种情况下,第一个任务可以由一个线程处理,第二个任务可以由另一个线程处理;如果处理第一个任务的线程被中断,则处理第二个任务的线程可以先完成

如果P1没有那么快地提交D2,则没有区别。消费者1可能仍然太慢,因此消费者2将能够首先完成。以下是此类方案的示例:

  1. P1:提交D1
  2. C1:读D1
  3. P2:提交D2
  4. C2:读D2
  5. C2:过程D2
  6. C1:过程D1

要解决这个问题,你必须引入某种完成检测,我认为这会使事情过于复杂。


如果您有足够的负载并且可以按顺序处理具有不同名称的某些任务,则可以为每个使用者使用一个队列,并将相同的命名任务放入同一队列。

public class ParallelQueue {

    private final BlockingQueue<Task>[] queues;
    private final int consumersCount;

    public ParallelQueue(int consumersCount) {
        this.consumersCount = consumersCount;

        queues = new BlockingQueue[consumersCount];
        for (int i = 0; i < consumersCount; i++) {
            queues[i] = new LinkedBlockingQueue<>();
        }
    }

    public void push(Task<?> task) {
        int index = task.name.hashCode() % consumersCount;
        queues[index].add(task);
    }

    public Task<?> pop(int consumerId) throws InterruptedException {
        int index = consumerId % consumersCount;
        return queues[index].take();
    }

    private final static class Task<T> {
        private final String name;
        private final T data;

        private Task(String name, T data) {
            this.name = name;
            this.data = data;
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.