如何用另一个void方法阻塞方法

问题描述 投票:0回答:2

我有一个listenner方法来处理Spring云流实现的消息,如下所示:

@StreamListener(value = MyInterface.INPUT)
public void handleMsg(@Payload Foo foo) {
    // if (concurrentHashMap.containsKey(foo.getId())) concurrentHashMap.remove(foo.getId());
}

这是我的第二种方法,应该被之前的方法阻止:

public Foo getFoo(long fooId) {
    // here I need block method with some mechanism until handleMsg remove received object from map and return this foo from there

    return fooFromStream;
}

我的目标是从服务类调用getFoo方法,如下所示:

// some logic

Foo foo = service.getFoo(fooId);

// some logic which required received foo;

我有想法将getFoo方法中的Foo包装到AsyncResult中,然后调用方法get关于导致块的Future结果,但我不知道如何将foo从流传递到方法getFoo

用例应该是这样的:

我调用方法getFoo将foo发送到消息代理并在map中注册foo,然后执行一些逻辑,然后在命令完成时我在StreamListenner中接收消息,从map中删除foo,接下来我需要从方法getFoo返回foo。

你能告诉我怎么做或解决它的最佳做法是什么?谢谢你的建议。

java spring spring-cloud spring-cloud-stream java-threads
2个回答
1
投票

目前还不完全清楚你要做什么,但是Map<Long, BlockingQueue<Foo>将允许你阻止take(或者,poll超时可能更好),直到听众offers Foo;然后删除地图条目。

请记住,一旦Foo被放入队列,记录将被激活,如果服务器崩溃,它将丢失。


0
投票

您可以使用并发Map of Long和Blocking队列阻塞队列:

ConcurrentMap<Long, BlockingQueue<Foo>> fooMap = new ConcurrentHashMap<>();
...
private BlockingQueue<Foo> getFooQueue(long fooId) {
    return fooMap.computeIfAbsent(fooId, l -> new ArrayBlockingQueue<>(1));
}
...
@StreamListener(value = MyInterface.INPUT)
public void handleMsg(@Payload Foo foo) {
    BlockingQueue<Foo> fq = getFooQueue(foo.getId());
    synchronized(fq) {
        fq.clear();
        fq.add(foo);
    }
}
...
public Foo getFoo(long fooId) throws InterruptedException {
    BlockingQueue<Foo> fq = getFooQueue(fooId);
    synchronized(fq) {
        return fq.take();
    }
}

这两个synchronized块只有在有可能你的handleMsg可以被多次调用时才需要,当一个当前可用的foo应该用新的foo覆盖。

© www.soinside.com 2019 - 2024. All rights reserved.