供应商绑定不适用于Spring Cloud Stream Rabbit

问题描述 投票:0回答:1

我们有如下资源,我们正在使用春季云流兔活页夹3.0.1.RELEASE。

@Component
public class Handlers {

  private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();

  public void emitData(String str){
    sourceGenerator.onNext(str);
  }

  @Bean
  public Supplier<Flux<String>> generate() {
    return () -> sourceGenerator;
  }

}

application.yml

spring:
  profiles: dev
  cloud:
    stream:
      function:
        definition: generate
        bindings:
          generate-out-0: source1

        bindingServiceProperties:
          defaultBinder: local_rabbit

      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: / 

虽然正在调用emitData方法,但在RabbitMQ队列中没有看到数据。我们还观察到消费者约束力正在起作用。我们通过RabbitMQ Admin直接将消息发送到消费者链接队列中进行了检查。但是供应商绑定无效。

[此外,我们观察到没有SupplierFlux在相同的application.yml配置下也可以正常工作。我们在这里缺少任何配置吗?

即使具有TestChannelBinderConfiguration的测试用例也可以正常工作,如下所示。

@Slf4j
@TestPropertySource(
        properties = {"spring.cloud.function.definition = generate"}
)
public class HandlersTest extends AbstractTest {
  @Autowired
  private OutputDestination outputDestination;

  @Test
  public void testGeneratorAndProcessor() {
      final String testStr = "test"; 
      handlers.emitData(testStr);

      Object eventObj;
      final Message<byte[]> message = outputDestination.receive(1000);

      assertNotNull(message, "processing timeout");
      eventObj = message.getPayload();

      assertEquals(new String((byte[]) eventObj), testStr);
  }
}
spring-cloud-stream spring-rabbitmq
1个回答
0
投票

当您说we are not seeing data in RabbitMQ queue. . .时。您在说哪个队列?使用AMQP时,消息将发送到exchanges,如果这种交换未绑定到任何queue,则会丢弃该消息,因此是我的问题。您是否实际上将generate-out-0交换绑定到队列?

无论如何,我只是对其进行了测试,并且一切都按预期进行。这是完整的代码。

@SpringBootApplication
public class SimpleStreamApplication {

    public static void main(String[] args) throws Exception {
        ApplicationContext context = SpringApplication.run(SimpleStreamApplication.class);
        SimpleStreamApplication app = context.getBean(SimpleStreamApplication.class);
        app.emitData("Hello");
    }

    private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();

    public void emitData(String str) {
        sourceGenerator.onNext(str);
    }

    @Bean
    public Supplier<Flux<String>> generate() {
        return () -> sourceGenerator;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.