我正在尝试从stream
查询到前端应用程序的postgres
结果,而不是急于获取所有结果。问题是我只能在终端中看到流式结果(即首先在"org.jooq.tools.LoggerListener : Record fetched: ..."
中,然后使用stream.get().forEach(s -> debug)
),并且引用此流的类仅在被调用以查看[[ C0]。
此数据也可以用于其他任务(例如,可视化,下载/导出,摘要统计信息等)。我一直在浏览有关null
的文档和帖子,我正在将它们用作我的ORM,并且尝试使用以下方法:
ResultSet
jOOQ
现在完全可以完美地使用以下内容,但是这将以巨号ResultQuery.stream()返回所有内容,并且不会流式传输结果:
DataController.java
ResponseEntity
QueryResult.java
ResultQuery.fetchMaps()
QueryService.java
@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {
@Autowired private QueryService queryService;
@PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ApiOperation(value = "Query the data")
@ResponseStatus(HttpStatus.CREATED)
public ResponseEntity<QueryResult> getQueryResults(
@RequestBody @ValidQuery Query query, HttpServletRequest request) {
QueryResult res = queryService.search(query);
return ResponseEntity.ok(res);
}
// ...
}
Query.java
public QueryResult(Stream<Record> result) {
this.result = result;
}
// public List<Map<String, Object>> getResult() { return result; }
@JsonProperty("result")
public Stream<Record> getResult() { return result; }
// public void setResult(List<Map<String, Object>> result) { this.result = result; }
public void setResult(Stream<Record> result) { this.result = result; }
}
DataApi.js
@Service
public class QueryService implements SearchService{
@Autowired DefaultDSLContext dslContext;
public QueryResult search(Query query) {
LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();
// Build selected fields
List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);
// Current support is for a single query. All others passed will be ignored
List<Filter> filters = query.getFilters();
Filter leadingFilter = QueryUtils.getLeadingFilter(filters);
// Build "where" conditions
Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);
// Get "from" statement
Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());
/*
// Works fine, but is not lazy fetching
List<Map<String, Object>> results =
dslContext
.select(selectFields)
.from(fromClause)
.where(conditionClause)
.limit(query.getOffset(), query.getLimit())
.fetchMaps();
*/
// Appears to work only once.
// Cannot see any results returned, but the number of records is correct.
// Everything in the records is null / undefined in the frontend
Supplier<Stream<Record>> results = () ->
dslContext
.select(selectFields)
.from(fromClause)
.where(conditionClause)
.limit(query.getOffset(), query.getLimit())
.fetchStream();
// "stream has already been operated upon or closed" is returned when using a Supplier
results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));
return new QueryResult(results.get());
}
}
Data.jsx
public class Query {
@NotNull(message = "Query must contain selection(s)")
private LinkedHashMap<DataSourceName, List<String>> selections;
private List<Filter> filters;
private List<Join> joins;
private List<Sort> sorts;
private long offset;
private int limit;
private QueryOptions options;
@JsonProperty("selections")
public LinkedHashMap<DataSourceName, List<String>> getSelections() {
return selections;
}
public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
this.selections = selections;
}
@JsonProperty("filters")
public List<Filter> getFilters() {
return filters;
}
public void setFilters(List<Filter> filters) {
this.filters = filters;
}
@JsonProperty("joins")
public List<Join> getJoins() {
return joins;
}
public void setJoins(List<Join> joins) {
this.joins = joins;
}
@JsonProperty("sorts")
public List<Sort> getSorts() {
return sorts;
}
public void setSorts(List<Sort> sorts) {
this.sorts = sorts;
}
@JsonProperty("options")
public QueryOptions getOptions() {
return options;
}
public void setOptions(QueryOptions options) {
this.options = options;
}
@JsonProperty("offset")
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
@JsonProperty("limit")
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
}
@Override
public String toString() {
return "Query{"
+ "selections=" + selections
+ ", filters=" + filters
+ ", sorts=" + sorts
+ ", offSet=" + offset
+ ", limit=" + limit
+ ", options=" + options
+ '}';
}
}
控制台中的返回结果
// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;
Java// ...
// This block queries Spring, and it returns the ResponseEntity with the ResultSet
// Streaming returns the right number of records, but every record is null / undefined
try {
const response = await dataApi.post('/v3/data', query);
} catch (error) {
// ...
}
// ...
API的全部要点是,此类流最多只能使用一次。它没有任何缓冲功能,也不像被动流实现那样支持基于推送的流模型。
您可以将另一个API添加到您的堆栈中,例如{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …}
data:
result: Array(100)
0: {}
1: {}
2: {}
3: {}
...
(还有其他人,但由于您已经在使用Spring ...),它支持缓冲和重播流给多个使用者的流,但是与jOOQ无关,并且将严重影响应用程序的体系结构。
注意jOOQ的Stream
扩展了Reactor
和JDK 9的Reactor
,以便与此类反应流更好地互操作。