我正在使用
implementation ('io.confluent.ksql:ksqldb-api-client:7.3.1')
我遇到了在下面的代码中抛出连接丢失异常的问题
@Override
public <T> Flux<T> select(Query query, Class<T> entityClass) throws DataAccessException {
String tableName = Objects.requireNonNull(mappingContext.getPersistentEntity(entityClass)).getTableName().toString();
logger.info("Executing ksqlDB push query: SELECT * FROM" + tableName + " EMIT CHANGES;");
return Flux.create((FluxSink<io.confluent.ksql.api.client.Row> sink) ->
client.streamQuery("select * from " + tableName + " " + emitChangesString(entityClass) + ";")
.thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(RowSubscriber.fromSink(sink)))
)
.flatMap(row -> {
try {
return Flux.just(objectMapper.readValue(row.asObject().toJsonString(), entityClass));
} catch (JsonProcessingException e) {
logger.error("ksqlDB Exception occurred while object mapping record from table:" + e);
return Flux.empty();
}
})
.doOnError(throwable -> logger.error("Exception occurred during ksqlDB push query processing"));
}
我希望当我断开互联网连接时 .doOnError() 会被击中,但实际上什么也没有发生,也没有抛出任何错误。
我是否在这里遗漏了一些东西来确定连接何时丢失?
您必须通过
onDispose
中的 onCancel
和 FluxSink
检查连接丢失。
我想这些都应该适合你。尝试测试一下。
将您的代码更新为:
@Override
public <T> Flux<T> select(Query query, Class<T> entityClass) throws DataAccessException {
String tableName = Objects.requireNonNull(mappingContext.getPersistentEntity(entityClass)).getTableName().toString();
logger.info("Executing ksqlDB push query: SELECT * FROM" + tableName + " EMIT CHANGES;");
return Flux.create((FluxSink<io.confluent.ksql.api.client.Row> sink) ->
sink.onCancel(() -> {
throw new RuntimeException("Client connection issue");
});
sink.onDispose(() -> {
throw new RuntimeException("Client connection issue");
});
client.streamQuery("select * from " + tableName + " " + emitChangesString(entityClass) + ";")
.thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(RowSubscriber.fromSink(sink)))
)
.flatMap(row -> {
try {
return Flux.just(objectMapper.readValue(row.asObject().toJsonString(), entityClass));
} catch (JsonProcessingException e) {
logger.error("ksqlDB Exception occurred while object mapping record from table:" + e);
return Flux.empty();
}
})
.doOnError(throwable -> logger.error("Exception occurred during ksqlDB push query processing"));
}