我们正在通过在 Quarkus 中使用 io.vertx.mutiny 实现它来构建一个反应式系统。 这时测试了一个单值,确认返回。但是,预期返回多条记录的变量不会完成处理,最终会被取消。我该如何处理?
如果您知道解决方案,可以享受它,我将不胜感激。
导入块
import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.vertx.mutiny.pgclient.PgPool; import io.vertx.mutiny.sqlclient.Row; import io.vertx.mutiny.sqlclient.RowSet; import io.vertx.mutiny.sqlclient.SqlClient; import io.vertx.mutiny.sqlclient.Tuple;`
private final PgPool client;
成功法
private final String FETCH_BY_ID = "SELECT p.*, c.name as category_name FROM products p INNER JOIN categories c ON p.category_id = c.id WHERE p.id = $1";
@Override
public Uni<Product> searchProduct(UUID productId) {
return this.executeFetchById(productId, this.client);
}
private Uni<Product> executeFetchById(UUID productId, SqlClient client) {
return client
.preparedQuery(FETCH_BY_ID)
.execute(Tuple.of(productId))
.onItem().transform(RowSet::iterator)
.onItem().transform(iterator -> iterator.hasNext() ? toProduct(iterator.next()) : null)
.onItem().transform(entity -> ObjectUtils.isNotEmpty(entity) ? productTranslator.toDomain(entity) : null);
}
失败
@Override
public Multi<Product> searchProducts(String sortKey, String order, Integer limit, Integer offset) {
final String query = generateSearchProductsQuery(sortKey, order);
return client
.preparedQuery(query)
.execute(selectTuple(limit, offset))
.onItem().transformToMulti(rows -> Multi.createFrom().iterable(rows))
.onItem().transform(this::toProduct)
.onItem().transform(entity -> ObjectUtils.isNotEmpty(entity) ? productTranslator.toDomain(entity) : null);
}
private Tuple selectTuple(Integer limit, Integer offset) {
return Tuple.wrap(Arrays.asList(
Objects.isNull(limit) ? 10 : limit,
Objects.isNull(offset) ? 0 : offset));
}
private String generateSearchProductsQuery(String sortKey, String order) {
String baseQuery = "SELECT p.*, c.name as category_name FROM products p INNER JOIN categories c ON p.category_id = c.id ORDER BY %s %s LIMIT $1 OFFSET $2";
String finalSortKey = Objects.isNull(sortKey) ? "p.id" : sortKey;
String finalOrder = Objects.isNull(order) ? "ASC" : order;
return String.format(baseQuery, finalSortKey, finalOrder);
}
return client
.preparedQuery(query)
.execute(selectTuple(limit, offset))
.onItem().transformToMulti(rows -> Multi.createFrom().iterable(rows))
.onItem().transform(this::toProduct)
.log("TEST A")
.onItem().transform(entity -> ObjectUtils.isNotEmpty(entity) ? productTranslator.toDomain(entity) : null);
显示“TEST A”,但该过程从未完成。
如果你在某个地方有一个完整的复制器项目会更容易,任何错误日志/堆栈跟踪也会有所帮助。
话虽如此,我看到了一些非常可疑的东西:
.onItem().transform(entity -> ObjectUtils.isNotEmpty(entity) ? productTranslator.toDomain(entity) : null)
这可能会在
null
流中插入 Multi
值,而这在 Multi
实现的 Reactive Streams规范中是不允许的。 当
transform
处理程序返回 null
时,流就会出错。
如下所示,只单独做OrderBy就可以处理,但原因不明。抱歉给您带来不便。
@Override
public Multi<Product> searchProducts(String sortKey, String order, Integer limit, Integer offset) {
final String query = QueryHelper.setOrderInQuery(ProductCommand.FECTH, order);
return client
.preparedQuery(query)
.execute(selectTuple(sortKey, limit, offset))
.onItem().transformToMulti(rows -> Multi.createFrom().iterable(rows))
.onItem().transform(this::toProduct)
.onItem().transform(entity -> ObjectUtils.isNotEmpty(entity) ? productTranslator.toDomain(entity) : null);
}
private Tuple selectTuple(String key, Integer limit, Integer offset) {
return Tuple.wrap(Arrays.asList(
Objects.isNull(key) ? " p.id" : key,
Objects.isNull(limit) ? 10 : limit,
Objects.isNull(offset) ? 0 : offset));