从Postgres获取的流,其中jOOQ不返回类的结果

问题描述 投票:1回答:1

问题

我正在尝试从stream查询到前端应用程序的postgres结果,而不是急于获取所有结果。问题是我只能在终端中看到流式结果(即首先在"org.jooq.tools.LoggerListener : Record fetched: ..."中,然后使用stream.get().forEach(s -> debug)),并且引用此流的类仅在被调用以查看[[ C0]。

此数据也可以用于其他任务(例如,可视化,下载/导出,摘要统计信息等)。我一直在浏览有关null的文档和帖子,我正在将它们用作我的ORM,并且尝试使用以下方法:

现在完全可以完美地使用以下内容,但是这将以巨号ResultQuery.stream()返回所有内容,并且不会流式传输结果:


当前课程

DataController.java

ResponseEntity

QueryResult.java

ResultQuery.fetchMaps()

QueryService.java

@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {

  @Autowired private QueryService queryService;

  @PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  @ApiOperation(value = "Query the data")
  @ResponseStatus(HttpStatus.CREATED)
  public ResponseEntity<QueryResult> getQueryResults(
      @RequestBody @ValidQuery Query query, HttpServletRequest request) {

    QueryResult res = queryService.search(query);
    return ResponseEntity.ok(res);
  }
// ...
}

Query.java

public QueryResult(Stream<Record> result) {
    this.result = result;
  }

//  public List<Map<String, Object>> getResult() { return result; }
  @JsonProperty("result")
  public Stream<Record> getResult() { return result; }


//  public void setResult(List<Map<String, Object>> result) { this.result = result; }
  public void setResult(Stream<Record> result) { this.result = result; }

}

DataApi.js

@Service
public class QueryService implements SearchService{
  @Autowired DefaultDSLContext dslContext;

  public QueryResult search(Query query) {

    LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();

    // Build selected fields
    List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);

    // Current support is for a single query. All others passed will be ignored
    List<Filter> filters = query.getFilters();
    Filter leadingFilter = QueryUtils.getLeadingFilter(filters);

    // Build "where" conditions
    Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);

    // Get "from" statement
    Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());

    /*
    // Works fine, but is not lazy fetching
    List<Map<String, Object>> results =
        dslContext
            .select(selectFields)
            .from(fromClause)
            .where(conditionClause)
            .limit(query.getOffset(), query.getLimit())
            .fetchMaps();
    */

      // Appears to work only once. 
      // Cannot see any results returned, but the number of records is correct. 
      // Everything in the records is null / undefined in the frontend
      Supplier<Stream<Record>> results = () ->
              dslContext
                      .select(selectFields)
                      .from(fromClause)
                      .where(conditionClause)
                      .limit(query.getOffset(), query.getLimit())
                      .fetchStream();

      // "stream has already been operated upon or closed" is returned when using a Supplier
      results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));

      return new QueryResult(results.get());

  }
}

Data.jsx

public class Query {
  @NotNull(message = "Query must contain selection(s)")
  private LinkedHashMap<DataSourceName, List<String>> selections;
  private List<Filter> filters;
  private List<Join> joins;
  private List<Sort> sorts;
  private long offset;
  private int limit;

  private QueryOptions options;

  @JsonProperty("selections")
  public LinkedHashMap<DataSourceName, List<String>> getSelections() {
    return selections;
  }

  public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
    this.selections = selections;
  }

  @JsonProperty("filters")
  public List<Filter> getFilters() {
    return filters;
  }

  public void setFilters(List<Filter> filters) {
    this.filters = filters;
  }

  @JsonProperty("joins")
  public List<Join> getJoins() {
    return joins;
  }

  public void setJoins(List<Join> joins) {
    this.joins = joins;
  }

  @JsonProperty("sorts")
  public List<Sort> getSorts() {
    return sorts;
  }

  public void setSorts(List<Sort> sorts) {
    this.sorts = sorts;
  }

  @JsonProperty("options")
  public QueryOptions getOptions() {
    return options;
  }

  public void setOptions(QueryOptions options) {
    this.options = options;
  }

  @JsonProperty("offset")
  public long getOffset() {
    return offset;
  }

  public void setOffset(long offset) {
    this.offset = offset;
  }

  @JsonProperty("limit")
  public int getLimit() {
    return limit;
  }

  public void setLimit(int limit) {
    this.limit = limit;
  }

  @Override
  public String toString() {
    return "Query{"
        + "selections=" + selections
        + ", filters="  + filters
        + ", sorts="    + sorts
        + ", offSet="   + offset
        + ", limit="    + limit
        + ", options="  + options
        + '}';
  }
}

控制台中的返回结果

// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;

堆栈:

  • Docker:19.03.5
  • Spring Boot:v2.1.8.RELEASE
  • Node:v12.13.1
  • 反应:16.9.0
  • OpenJDK:12.0.2
  • jOOQ:3.12.3
  • postgres:10.7
java spring postgresql stream jooq
1个回答
0
投票

Java// ... // This block queries Spring, and it returns the ResponseEntity with the ResultSet // Streaming returns the right number of records, but every record is null / undefined try { const response = await dataApi.post('/v3/data', query); } catch (error) { // ... } // ... API的全部要点是,此类流最多只能使用一次。它没有任何缓冲功能,也不像被动流实现那样支持基于推送的流模型。

您可以将另一个API添加到您的堆栈中,例如{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …} data: result: Array(100) 0: {} 1: {} 2: {} 3: {} ... (还有其他人,但由于您已经在使用Spring ...),它支持缓冲和重播流给多个使用者的流,但是与jOOQ无关,并且将严重影响应用程序的体系结构。

注意jOOQ的Stream扩展了Reactor和JDK 9的Reactor,以便与此类反应流更好地互操作。

© www.soinside.com 2019 - 2024. All rights reserved.