使用 io.projectreactor.kafka 将“数据类”放入 Kafka:reactor-kafka jsonserializer 问题

问题描述 投票:0回答:1

我在 reactor-kafka 中找不到任何 json 序列化程序,所以我使用

org.springframework.kafka.support.serializer.JsonSerializer
我也尝试使用
com.fasterxml.jackson.databind.annotation.JsonSerialize
代替,但它没有用。 任何机构都可以帮助我吗? 我想使用一些标准的 JSON 序列化器,这样可以用其他语言编写的其他服务可以读取数据而不会出现严重问题。

我的配置类

import com.company.project.testservice.model.kafka.Mydata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonSerializer;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaReactiveConfig{

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;



    public SenderOptions<String, MyData> getSenderOptions(){
        Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);


        producerProps.put(ProducerConfig.ACKS_CONFIG,"1");

        SenderOptions<String, CdsRecordKafka> senderOptions =
                SenderOptions.<String, MyData>create(producerProps).maxInFlight(1024);
        return senderOptions;
    }


    @Bean
    public KafkaSender<String,MyData> getSender(){
        KafkaSender<String, MyData> sender =  KafkaSender.create(getSenderOptions());
        return sender;
    }
}


我的发件人方法

@Component
@AllArgsConstructor
public class KafkaComponent {

    private KafkaSender<String, Mydata> kafkaSender;

    public Flux<SenderResult<String>> sendCdsRecord(Flux<MyData> myDataFlux){
        return kafkaSender.send(myDataFlux);
    }
}


我的数据类


@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
@Data
public class MyData {
    private String x;
    private String y;
    private String z;
}

我的 build.gradle 文件的一部分

plugins {
    id 'org.springframework.boot' version '2.6.1'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'java'
    id 'org.springframework.experimental.aot' version '0.11.0-RC1'
    id 'org.asciidoctor.convert' version '2.4.0'
    id 'io.freefair.lombok' version '6.3.0'
}
configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    maven { url 'https://repo.spring.io/milestone' }
    mavenCentral()
}

ext {
    set('springCloudVersion', "2021.0.0")
    set('snippetsDir', file("build/generated-snippets"))
}
dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j'
    implementation 'io.projectreactor.kafka:reactor-kafka:1.3.8'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.mapstruct:mapstruct-processor:1.4.2.Final'
    implementation 'org.mapstruct:mapstruct:1.4.2.Final'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
}

我得到这样的错误

org.springframework.core.codec.CodecException: Type definition error: [simple type, class reactor.kafka.sender.internals.Response]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class reactor.kafka.sender.internals.Response and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
    at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:226) ~[spring-web-5.3.13.jar:5.3.13]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ Handler **MyController**#update() [DispatcherHandler]
    *__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ HTTP GET "/update" [ExceptionHandlingWebHandler]
Original Stack Trace:
        at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:226) ~[spring-web-5.3.13.jar:5.3.13]
        at org.springframework.http.codec.ServerSentEventHttpMessageWriter.lambda$encodeEvent$1(ServerSentEventHttpMessageWriter.java:177) ~[spring-web-5.3.13.jar:5.3.13]
        at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:46) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.Flux.subscribe(Flux.java:8469) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.Flux.subscribe(Flux.java:8469) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:449) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.12.jar:3.4.12]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.emitCachedSignals(ChannelSendOperator.java:312) ~[spring-web-5.3.13.jar:5.3.13]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:283) ~[spring-web-5.3.13.jar:5.3.13]
        at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:162) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:236) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.12.jar:3.4.12]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.subscribe(ChannelSendOperator.java:358) ~[spring-web-5.3.13.jar:5.3.13]
        at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.Flux.subscribe(Flux.java:8469) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:101) ~[reactor-netty-core-1.0.13.jar:1.0.13]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.12.jar:3.4.12]
        at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196) ~[reactor-netty-core-1.0.13.jar:1.0.13]
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:414) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
        at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class reactor.kafka.sender.internals.Response and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
    at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77) ~[jackson-databind-2.13.0.jar:2.13.0]
    at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1300) ~[jackson-databind-2.13.0.jar:2.13.0]
    at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400) ~[jackson-databind-2.13.0.jar:2.13.0]
    at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:46) ~[jackson-databind-2.13.0.jar:2.13.0]
    at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:29) ~[jackson-databind-2.13.0.jar:2.13.0]
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) ~[jackson-databind-2.13.0.jar:2.13.0]
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) ~[jackson-databind-2.13.0.jar:2.13.0]
    at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1518) ~[jackson-databind-2.13.0.jar:2.13.0]
    at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:1007) ~[jackson-databind-2.13.0.jar:2.13.0]
    at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:222) ~[spring-web-5.3.13.jar:5.3.13]
    at org.springframework.http.codec.ServerSentEventHttpMessageWriter.lambda$encodeEvent$1(ServerSentEventHttpMessageWriter.java:177) ~[spring-web-5.3.13.jar:5.3.13]
    at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:46) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.Flux.subscribe(Flux.java:8469) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.Flux.subscribe(Flux.java:8469) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:449) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.12.jar:3.4.12]
    at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.emitCachedSignals(ChannelSendOperator.java:312) ~[spring-web-5.3.13.jar:5.3.13]
    at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:283) ~[spring-web-5.3.13.jar:5.3.13]
    at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:162) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:236) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.12.jar:3.4.12]
    at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.subscribe(ChannelSendOperator.java:358) ~[spring-web-5.3.13.jar:5.3.13]
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.Flux.subscribe(Flux.java:8469) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:101) ~[reactor-netty-core-1.0.13.jar:1.0.13]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.12.jar:3.4.12]
    at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196) ~[reactor-netty-core-1.0.13.jar:1.0.13]
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:414) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) ~[netty-transport-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.70.Final.jar:4.1.70.Final]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]


apache-kafka spring-webflux jsonserializer webflux reactor-kafka
1个回答
0
投票

问题出在控制器的其他地方。解决了

© www.soinside.com 2019 - 2024. All rights reserved.