我正在尝试制作几个反应式微服务: 一位制作人:
@RestController
@RequiredArgsConstructor
public class EventController {
private final Sinks.Many<Event> sink;
@PostMapping()
public void publish(@RequestParam Event event) {
Sinks.EmitResult emitResult = sink.tryEmitNext(event);
if (emitResult.isFailure()) {
throw new RuntimeException("Error while publishing event");
} else System.out.println("Received: " + event);
}
@GetMapping()
public Mono<Event> getEvent() {
return sink.asFlux().next();
}
}
我将 Sink 配置为
@Bean
public Sinks.Many<Event> sink() {
return Sinks.many().multicast().onBackpressureBuffer();
}
我使用事件枚举作为传递的值。在服务生产者中,我试图创建两个端点:
和多个消费者:
@EnableScheduling
@RequiredArgsConstructor
@SpringBootApplication
public class ReactiveEventsSubscriberApplication {
private final WebClient webClient;
public static void main(String[] args) {
SpringApplication.run(ReactiveEventsSubscriberApplication.class, args);
}
@Scheduled(fixedDelay = 3, timeUnit = TimeUnit.SECONDS)
public void subscribe() {
Mono<Event> eventMono = webClient.get()
.retrieve()
.bodyToMono(Event.class);
eventMono.subscribe(System.out::println);
}
}
我希望服务消费者收到最新的事件。 当我尝试创建事件时,没有任何反应,就好像水槽中根本没有对象一样。我做错了什么?
您正在尝试在微服务中实现发布-订阅模式,但是, 在您的消费者服务中,您使用 webClient.get() 来检索事件。这将执行 HTTP GET 请求,该请求可能不适合从生产者服务接收事件。 我认为,消费者应该订阅接收器来接收生产者推送的事件。
你的消费者应该看起来像这样;
@Service
@RequiredArgsConstructor
public class EventSubscriberService {
private final Sinks.Many<Event> sink;
@PostConstruct
public void subscribeToEvents() {
sink.asFlux().subscribe(event -> {
System.out.println("Received event: " + event);
// Process the event as needed
});
}
)
另外,在你的生产者控制器中,我想说你应该使用 @RequestBody 而不是使用 @RequestParam,然后在调用 API 时将事件属性作为请求正文的一部分传递