我有一种情况,我想根据事件中的日期时间属性来处理事件。
到目前为止,我的尝试失败:
class Data {
private Integer id;
private LocalDateTime sendAt;
}
List<Data> data = Arrays.asList(
new Data(1, LocalDateTime.now().plusSeconds(1)),
new Data(2, LocalDateTime.now().plusSeconds(2)),
new Data(3, LocalDateTime.now().plusSeconds(3)),
new Data(4, LocalDateTime.now().plusSeconds(5)),
new Data(5, LocalDateTime.now().plusSeconds(8)),
new Data(6, LocalDateTime.now().plusSeconds(13)),
new Data(7, LocalDateTime.now().plusSeconds(21)),
new Data(8, LocalDateTime.now().plusSeconds(34)),
new Data(9, LocalDateTime.now().plusSeconds(55)));
Flux<Data> dataFlux = Flux.fromIterable(data);
dataFlux.takeWhile(d -> d.sendAt.isAfter( LocalDateTime.now() ))
.subscribe(x -> System.out.println(x));
我希望事件在每个1, 2, 3, 5, 8, 13, 21, ...
秒后打印。
Spring WebFlux / Reactor是否可能?
可以通过将delayUntil
与Mono.delay(<custom_time>)
组合在一起,以及HTTP流与MediaType.APPLICATION_STREAM_JSON_VALUE
或SSE的MediaType.TEXT_EVENT_STREAM_VALUE
:
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
@RestController
public class FluxDateTime {
@GetMapping(value = "/time", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Data> getData() {
List<Data> data = Arrays.asList(
new Data(1, 1),
new Data(2, 2),
new Data(3, 3),
new Data(4, 5),
new Data(5, 8),
new Data(6, 13),
new Data(7, 21),
new Data(8, 34),
new Data(9, 55));
return Flux.fromIterable(data)
.delayUntil(d -> Mono.delay(Duration.ofSeconds(d.getDelay())));
}
/* Keep in mind that empty constructor, getters/setters
are purely only for JSON serialization */
private final class Data {
private int id;
private int delay;
public Data() {}
public Data(int id, int delay) {
this.id = id;
this.delay = delay;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getDelay() {
return delay;
}
public void setDelay(int delay) {
this.delay = delay;
}
}
}
助焊剂收集在每个元素上都会延迟,直到基础Mono流终止。每个Monos都会根据输入流数据以秒为单位延迟相应的时间。
实际上,我们已经获得了带有所需的自定义延迟的流HTTP响应:
$ curl http://localhost:8080/time
{"id":1,"delay":1} # after 1 sec
{"id":2,"delay":2} # after 2 sec
{"id":3,"delay":3} # and so on
{"id":4,"delay":5}
{"id":5,"delay":8}
{"id":6,"delay":13}
{"id":7,"delay":21}
{"id":8,"delay":34}
{"id":9,"delay":55}