我们有如下资源,我们正在使用春季云流兔活页夹3.0.1.RELEASE。
@Component
public class Handlers {
private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();
public void emitData(String str){
sourceGenerator.onNext(str);
}
@Bean
public Supplier<Flux<String>> generate() {
return () -> sourceGenerator;
}
}
application.yml
spring:
profiles: dev
cloud:
stream:
function:
definition: generate
bindings:
generate-out-0: source1
bindingServiceProperties:
defaultBinder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
虽然正在调用emitData
方法,但在RabbitMQ队列中没有看到数据。我们还观察到消费者约束力正在起作用。我们通过RabbitMQ Admin直接将消息发送到消费者链接队列中进行了检查。但是供应商绑定无效。
[此外,我们观察到没有Supplier
的Flux
在相同的application.yml
配置下也可以正常工作。我们在这里缺少任何配置吗?
即使具有TestChannelBinderConfiguration的测试用例也可以正常工作,如下所示。
@Slf4j
@TestPropertySource(
properties = {"spring.cloud.function.definition = generate"}
)
public class HandlersTest extends AbstractTest {
@Autowired
private OutputDestination outputDestination;
@Test
public void testGeneratorAndProcessor() {
final String testStr = "test";
handlers.emitData(testStr);
Object eventObj;
final Message<byte[]> message = outputDestination.receive(1000);
assertNotNull(message, "processing timeout");
eventObj = message.getPayload();
assertEquals(new String((byte[]) eventObj), testStr);
}
}
当您说we are not seeing data in RabbitMQ queue. . .
时。您在说哪个队列?使用AMQP时,消息将发送到exchanges
,如果这种交换未绑定到任何queue
,则会丢弃该消息,因此是我的问题。您是否实际上将generate-out-0
交换绑定到队列?
无论如何,我只是对其进行了测试,并且一切都按预期进行。这是完整的代码。
@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
ApplicationContext context = SpringApplication.run(SimpleStreamApplication.class);
SimpleStreamApplication app = context.getBean(SimpleStreamApplication.class);
app.emitData("Hello");
}
private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();
public void emitData(String str) {
sourceGenerator.onNext(str);
}
@Bean
public Supplier<Flux<String>> generate() {
return () -> sourceGenerator;
}
}