ksqlDB Java 客户端无法显示连接异常

问题描述 投票:0回答:1

我正在使用

implementation ('io.confluent.ksql:ksqldb-api-client:7.3.1')

我遇到了在下面的代码中抛出连接丢失异常的问题

@Override
public <T> Flux<T> select(Query query, Class<T> entityClass) throws DataAccessException {
    String tableName = Objects.requireNonNull(mappingContext.getPersistentEntity(entityClass)).getTableName().toString();
    logger.info("Executing ksqlDB push query: SELECT * FROM" + tableName + " EMIT CHANGES;");
    return Flux.create((FluxSink<io.confluent.ksql.api.client.Row> sink) ->
                    client.streamQuery("select * from " + tableName + " " + emitChangesString(entityClass) + ";")
                            .thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(RowSubscriber.fromSink(sink)))
            )
            .flatMap(row -> {
                try {
                    return Flux.just(objectMapper.readValue(row.asObject().toJsonString(), entityClass));
                } catch (JsonProcessingException e) {
                    logger.error("ksqlDB Exception occurred while object mapping record from table:" + e);
                    return Flux.empty();
                }
            })
            .doOnError(throwable -> logger.error("Exception occurred during ksqlDB push query processing"));
}

我希望当我断开互联网连接时 .doOnError() 会被击中,但实际上什么也没有发生,也没有抛出任何错误。

我是否在这里遗漏了一些东西来确定连接何时丢失?

java spring-webflux ksqldb vertex
1个回答
0
投票

您必须通过

onDispose
中的
onCancel
FluxSink
检查连接丢失。

我想这些都应该适合你。尝试测试一下。

将您的代码更新为:

@Override
public <T> Flux<T> select(Query query, Class<T> entityClass) throws DataAccessException {
    String tableName = Objects.requireNonNull(mappingContext.getPersistentEntity(entityClass)).getTableName().toString();
    logger.info("Executing ksqlDB push query: SELECT * FROM" + tableName + " EMIT CHANGES;");
    return Flux.create((FluxSink<io.confluent.ksql.api.client.Row> sink) ->
        sink.onCancel(() -> {
            throw new RuntimeException("Client connection issue");
        });
        sink.onDispose(() -> {
            throw new RuntimeException("Client connection issue");
        });
       client.streamQuery("select * from " + tableName + " " + emitChangesString(entityClass) + ";")
                            .thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(RowSubscriber.fromSink(sink)))
            )
            .flatMap(row -> {
                try {
                    return Flux.just(objectMapper.readValue(row.asObject().toJsonString(), entityClass));
                } catch (JsonProcessingException e) {
                    logger.error("ksqlDB Exception occurred while object mapping record from table:" + e);
                    return Flux.empty();
                }
            })
            .doOnError(throwable -> logger.error("Exception occurred during ksqlDB push query processing"));
}
© www.soinside.com 2019 - 2024. All rights reserved.