基于日期时间属性的磁通发射元素

问题描述 投票:1回答:1

我有一种情况,我想根据事件中的日期时间属性来处理事件。

到目前为止,我的尝试失败:

class Data {
    private Integer id;
    private LocalDateTime sendAt;
}

List<Data> data = Arrays.asList(
        new Data(1, LocalDateTime.now().plusSeconds(1)),
        new Data(2, LocalDateTime.now().plusSeconds(2)),
        new Data(3, LocalDateTime.now().plusSeconds(3)),
        new Data(4, LocalDateTime.now().plusSeconds(5)),
        new Data(5, LocalDateTime.now().plusSeconds(8)),
        new Data(6, LocalDateTime.now().plusSeconds(13)),
        new Data(7, LocalDateTime.now().plusSeconds(21)),
        new Data(8, LocalDateTime.now().plusSeconds(34)),
        new Data(9, LocalDateTime.now().plusSeconds(55)));

Flux<Data> dataFlux = Flux.fromIterable(data);

dataFlux.takeWhile(d -> d.sendAt.isAfter( LocalDateTime.now() ))
        .subscribe(x -> System.out.println(x));

我希望事件在每个1, 2, 3, 5, 8, 13, 21, ...秒后打印。

Spring WebFlux / Reactor是否可能?

java spring-webflux reactor
1个回答
0
投票

可以通过将delayUntilMono.delay(<custom_time>)组合在一起,以及HTTP流与MediaType.APPLICATION_STREAM_JSON_VALUE或SSE的MediaType.TEXT_EVENT_STREAM_VALUE

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

@RestController
public class FluxDateTime {
    @GetMapping(value = "/time", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Data> getData() {
        List<Data> data = Arrays.asList(
                new Data(1, 1),
                new Data(2, 2),
                new Data(3, 3),
                new Data(4, 5),
                new Data(5, 8),
                new Data(6, 13),
                new Data(7, 21),
                new Data(8, 34),
                new Data(9, 55));

        return Flux.fromIterable(data)
                .delayUntil(d -> Mono.delay(Duration.ofSeconds(d.getDelay())));
    }

    /* Keep in mind that empty constructor, getters/setters
       are purely only for JSON serialization */
    private final class Data {
        private int id;
        private int delay;

        public Data() {}

        public Data(int id, int delay) {
            this.id = id;
            this.delay = delay;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public int getDelay() {
            return delay;
        }

        public void setDelay(int delay) {
            this.delay = delay;
        }
    }
}

助焊剂收集在每个元素上都会延迟,直到基础Mono流终止。每个Monos都会根据输入流数据以秒为单位延迟相应的时间。

实际上,我们已经获得了带有所需的自定义延迟的流HTTP响应:

$ curl http://localhost:8080/time
{"id":1,"delay":1} # after 1 sec
{"id":2,"delay":2} # after 2 sec
{"id":3,"delay":3} # and so on
{"id":4,"delay":5}
{"id":5,"delay":8}
{"id":6,"delay":13}
{"id":7,"delay":21}
{"id":8,"delay":34}
{"id":9,"delay":55}
© www.soinside.com 2019 - 2024. All rights reserved.