我有以下代码。我想要的结果是,当创建新记录时,客户端将自动使用新信息进行更新。而是,下面的代码仅从findAll
获取记录。如何更改代码,以便客户端自动更新?
@RestController
public class CommentController
{
//...
@GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Comment> comments()
{
return commentRepository.findAll()
.publishOn(Schedulers.parallel())
.log()
.delayElements(Duration.ofMillis(100));
}
}
要保存我使用的评论(效果很好):
@PostMapping("/comment/save")
@ResponseBody
public Mono<Comment> save(@RequestBody Comment comment)
{
return this.commentRepository.save(comment);
}
有了(效果很好):
curl -X POST localhost:8080/comment/save -H 'Content-type:application/json' -d '{"author":"author","message":"Message","timestamp":"Timestamp"}'
客户端javascript(似乎也可以正常工作::
function loadComments () {
this.source = null;
this.start = function () {
var commentTable = document.getElementById("comments");
this.source = new EventSource("/comment/stream");
this.source.addEventListener("message", function (event) {
// These events are JSON, so parsing and DOM fiddling are needed
var comment = JSON.parse(event.data);
var row = commentTable.getElementsByTagName("tbody")[0].insertRow(0);
var cell0 = row.insertCell(0);
var cell1 = row.insertCell(1);
var cell2 = row.insertCell(2);
cell0.className = "author-style";
cell0.innerHTML = comment.author;
cell1.className = "text";
cell1.innerHTML = comment.message;
cell2.className = "date";
cell2.innerHTML = comment.timestamp;
});
this.source.onerror = function () {
this.close();
};
};
this.stop = function() {
this.source.close();
}
}
comment = new loadComments();
/*
* Register callbacks for starting and stopping the SSE controller.
*/
window.onload = function() {
comment.start();
};
window.onbeforeunload = function() {
comment.stop();
}
要查看完整的代码:https://github.com/douma/reactor-sse
我创建了一个解决方案:
@GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> comments()
{
return Flux.create(fluxSink -> {
Flux.interval(Duration.ofSeconds(1))
.map(s -> {
this.commentRepository.findByTimestampGreaterThanOrderByTimestampDesc(lastChecked).take(5).doOnEach(new Consumer<Signal<Comment>>() {
@Override
public void accept(Signal<Comment> commentSignal) {
fluxSink.next(commentSignal.get());
}
}).subscribe();
lastChecked = Calendar.getInstance().getTime();
lastChecked.setTime(lastChecked.getTime() - 5000);
return s;
})
.subscribe();
})
.publishOn(Schedulers.parallel())
.onBackpressureBuffer();
}