Java:实现多线程供应商/消费者管道,每种任务都有并行限制

问题描述 投票:2回答:2

我们需要异步处理不同类型的对象。使用API​​密钥处理每种类型的对象。

每个API密钥对并发使用都有自己的限制(对于一个API密钥,不超过5个并行会话)。

我们对工作线程数(CPU限制)有全局限制。

我们希望在工作线程限制内尽可能多地进行API调用。

可能的解决方案:

2 tasks with KEY1 (max 2 session) -\  total 3 workers
5 tasks with KEY2 (max 3 session) -/

是:

1. worker1: KEY2, worker2: KEY2, worker3: KEY2 (in queue: 2x KEY1, 2x KEY2)
2. worker1: KEY1, worker2: KEY2, worker3: KEY2 (in queue: 1x KEY1, 3x KEY2)
3. worker1: KEY1, worker2: KEY1, worker3: KEY2 (in queue: 4x KEY2)

可能的解决方案:

3 tasks with KEY1 (max 1 session) & 3 workers

是:

1. worker1: KEY1, worker2: IDLE, worker3: IDLE, (in queue 2x KEY1)

执行顺序并不重要(但我们希望先听到先到先出的政策),最大吞吐量是最重要的。

目前尚不清楚选择哪种实施策略。

任何队列的ThreadExecutor都不够,因为您需要知道ThreadExecutor当前使用的是哪些API密钥。

java threadpool producer-consumer
2个回答
1
投票

我不确定我的问题是否正确,但您需要的是每个API密钥的Semaphore

Semaphore key1Semaphore = new Semaphore(2);
Semaphore key2Semaphore = new Semaphore(3);

您可以检查key1Semaphore是否有许可证,并通过致电key1Semaphore.tryAcquire()获取一个。这是非阻塞的,因此如果失败并返回false,您可以尝试从其他API密钥获取信号量并从中提交任务。

重要的是,在任务结束时使用其中一个API密钥,信号量许可证将被释放回来。

您可能需要一个额外的对象来与wait()notify()同步,以便在任务完成时通知调度任务的主线程再次检查信号量。

基本上你得到的是你的任务调度员会向你的3个工人的ExecutorService提交5个任务,然后在其中一个信号量许可被释放之前它将无法再提交。

当任务完成并且许可证被释放时,调度程序会收到通知,因此它会从等待中解锁,并再次按顺序检查信号量并将任务提交给ExecutorService

这个解决方案有点偏向于第一个API密钥,但您可以通过检查每个密钥的任务长度并更公平地分配它们来进一步优化它。您甚至可以旋转索引,以便每个循环都将索引递增1,这样第一次从API KEY 1开始,第二次从API KEY 2开始等。


1
投票

我可能会创建一个服务维护

  • 包含任务和相应密钥的条目的单个Queue
  • 带钥匙的Map和该钥匙已经运行的线程(Map<String,AtomicInteger>),以及
  • 带有全局允许线程数的ThreadPoolExecutor

如果全局线程计数已满并且已提交任务,则将其放入队列末尾。

如果全局线程计数未满,则检查与请求密钥对应的映射值的密钥线程限制;如果到达,则将任务放回队列,否则提交给执行程序服务。

“提交到执行程序服务”不会直接提交任务,而是增加关键线程数,并将任务包装到Runnable中,另外1.减少映射中的关键线程数,然后触发重新评估队列如果适用,则提交新任务。

也可以在BlockingQueue中创建“每个键的活动计数”逻辑,该逻辑将作为first()返回包含未达到最大计数的键的任务的下一个元素,并将其作为管理队列传递给ThreadPoolExecutor构造函数;但我确信这会破坏队列合同而不是完全安全的使用。

© www.soinside.com 2019 - 2024. All rights reserved.