Spring boot Redis 和 Kafka 序列化错误

问题描述 投票:0回答:1

我们使用 Spring Cloud Stream 向 kafka 生成事件,一切正常,直到我们开始引入 Redis 缓存。

我认为由于某种原因Redis和Kafka在序列化上的结合失败了。

在这个阶段,我们似乎让 Redis 缓存工作正常,但向 kafka 发出一个事件,但在将事件生成到 Kafka 时出现以下 Redis 异常。

Error while producing SampleEvent: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'sampleOutput'; 
nested exception is org.springframework.data.redis.serializer.SerializationException: Could not write JSON: Not an array: 
{"type":"record","name":"SampleEvent","namespace":"com.sample.redisandkafka","fields":[{"name":"field","type":"string","default":"null"}]} (through reference chain: org.springframework.cloud.schema.registry.ParsedSchema["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: 
Not an array: {"type":"record","name":"SampleEvent","namespace":"com.sample.redisandkafka","fields":[{"name":"field","type":"string","default":"null"}]} (through reference chain: org.springframework.cloud.schema.registry.ParsedSchema["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]), failedMessage=GenericMessage [payload={"field": "value"}, headers={id=16108acf-6ca1-f1b2-2ed3-44eb857daa6b, contentType=application/*+avro, timestamp=1712741537549}]

春季启动:2.4.5

春云流:2020.0.2

这个 github 存储库包含可重现的示例。

https://github.com/krishgokul494/redisandkafka.git

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sample</groupId>
    <artifactId>redisandkafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>redisandkafka</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>2020.0.2</spring-cloud.version>
        <spring-boot.version>2.4.5</spring-boot.version>
        <avro.version>1.8.2</avro.version>
        <confluent.version>4.0.0</confluent.version>
        <jacoco.version>0.8.5</jacoco.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-schema-registry-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-cache</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.hibernate</groupId>
                    <artifactId>hibernate-validator</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.sample.redisandkafka.Application</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.jacoco</groupId>
                <artifactId>jacoco-maven-plugin</artifactId>
                <version>${jacoco.version}</version>
                <configuration>
                    <excludes>
                        <exclude>**/avro/**/*</exclude>
                    </excludes>
                </configuration>
                <executions>
                    <execution>
                        <id>default-prepare-agent</id>
                        <goals>
                            <goal>prepare-agent</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>default-report</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>report</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <profiles>
        <profile>
            <id>kafka-binder</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
                </dependency>
            </dependencies>
        </profile>

        <profile>
            <id>repo</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <repositories>
                <repository>
                    <id>spring-libs-milestones</id>
                    <name>Spring Milestones</name>
                    <url>https://repo.spring.io/milestone</url>
                    <snapshots>
                        <enabled>false</enabled>
                    </snapshots>
                </repository>
                <repository>
                    <id>spring-milestones</id>
                    <name>Spring libs-Milestones</name>
                    <url>https://repo.spring.io/libs-milestone/</url>
                    <snapshots>
                        <enabled>false</enabled>
                    </snapshots>
                </repository>
                <repository>
                    <id>confluent</id>
                    <url>http://packages.confluent.io/maven/</url>
                    <snapshots>
                        <enabled>false</enabled>
                    </snapshots>
                </repository>
            </repositories>
        </profile>
    </profiles>
    <pluginRepositories>
        <pluginRepository>
            <id>repository.spring.release</id>
            <name>Spring GA Repository</name>
            <url>https://repo.spring.io/plugins-release/</url>
        </pluginRepository>
    </pluginRepositories>

</project>

示例KafkaSourceProducer.java

package com.sample.redisandkafka.kafka;

import com.sample.redisandkafka.SampleEvent;
import com.sample.redisandkafka.config.SampleOutputSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(SampleOutputSource.class)
@Slf4j
public class SampleKafkaSourceProducer {

  @Autowired
  private SampleOutputSource sampleOutputSource;

  public boolean produceSampleEvent() {
    SampleEvent sampleEvent = SampleEvent.newBuilder().setField("value").build();

    try {
      sampleOutputSource.sampleMessageChannel().send(MessageBuilder.withPayload(sampleEvent).build());
    } catch(Exception ex) {
      log.error("Error while producing SampleEvent: {}", ex.toString());
      return false;
    }
    return true;
  }

}

KafkaConfig.java

package com.sample.redisandkafka.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.schema.registry.client.ConfluentSchemaRegistryClient;
import org.springframework.cloud.schema.registry.client.SchemaRegistryClient;
import org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.KafkaHeaderMapper;

@Configuration
public class KafkaConfig {

  @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
  private String endpoint;

  @Bean
  public SchemaRegistryClient confluentSchemaRegistryClient() {
    ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
    client.setEndpoint(endpoint);
    return client;
  }


  @Bean("kafkaBinderHeaderMapper")
  public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    BinderHeaderMapper mapper = new BinderHeaderMapper();
    mapper.setEncodeStrings(true);
    return mapper;
  }
}

RedisCacheConfig.java

package com.sample.redisandkafka.config;

import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;


@EnableCaching
@Configuration
@ConditionalOnProperty(name = "spring.cache.enabled", havingValue = "true")
public class RedisCacheConfig extends CachingConfigurerSupport {

  @Autowired
  private CacheConfigurationProperties cacheConfigurationProperties = null;

  private RedisCacheConfiguration createCacheConfiguration(long timeoutInHours) {
    return RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(timeoutInHours))
     .serializeValuesWith(RedisSerializationContext.SerializationPair
     .fromSerializer(new GenericJackson2JsonRedisSerializer()));
  }

  @Bean
  public CacheManager cacheManager(LettuceConnectionFactory redisConnectionFactory) {
    Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();

    if (Objects.nonNull(cacheConfigurationProperties.getCachesTTL())) {
      for (Entry<String, Long> cacheNameAndTimeout : cacheConfigurationProperties.getCachesTTL()
          .entrySet()) {
        cacheConfigurations.put(cacheNameAndTimeout.getKey(),
            createCacheConfiguration(cacheNameAndTimeout.getValue()));
      }
    }

    return RedisCacheManager.builder(redisConnectionFactory)
        .cacheDefaults(createCacheConfiguration(cacheConfigurationProperties.getDefaultTTL()))
        .withInitialCacheConfigurations(cacheConfigurations).build();
  }

  @Bean
  public LettuceConnectionFactory redisConnectionFactory() {
    RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(
        cacheConfigurationProperties.getHost(), cacheConfigurationProperties.getPort());

    return new LettuceConnectionFactory(redisStandaloneConfiguration);
  }

  @Configuration
  @ConfigurationProperties(prefix = "spring.cache")
  @Data
  class CacheConfigurationProperties {
    private String host;
    private int port;
    private Long defaultTTL;
    private Map<String, Long> cachesTTL;
  }
}

SampleInputSink.java

package com.sample.redisandkafka.config;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;

public interface SampleInputSink {

  String SAMPLE = "sampleInput";

  @Input(SAMPLE)
  MessageChannel sampleMessageChannel();
}

示例输出源.java

package com.sample.redisandkafka.config;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface SampleOutputSource {
  String SAMPLE = "sampleOutput";

  @Output(SAMPLE)
  MessageChannel sampleMessageChannel();
}

应用程序.yml

spring:
  application:
    name: redisandkafka
  main:
    allow-bean-definition-overriding: true

  cache:
    cache-names: sample-cache
    default-ttl: 1 # TTL in hours
    enabled: true
    type: redis
    host: localhost
    port: 6379
    caches-ttl: # TTL in hours
      sample-cache: 1

  cloud:
    stream:
      schemaRegistryClient:
        endpoint: http://localhost:9081
      kafka:
        binder:
          brokers: PLAINTEXT_HOST://localhost:30092
          min-partition-count: 1
          replication-factor: 1
          useNativeDecoding: true
        bindings:
          sampleInput:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            consumer:
              configuration:
                schema.registry.url: http://localhost:9081
                specific.avro.reader: true
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
                max:
                  poll:
                    records: 100
                    interval.ms: 900000
      bindings:
        sampleInput:
          contentType: application/*+avro
          destination: redisandkafka.local.sample_event
          group: sample.local.sample_event
        sampleOutput:
          contentType: application/*+avro
          destination: redisandkafka.local.sample_event_output
          group: sample.local.sample_event
      streams:
        binder:
          configuration:
            default:
              key:
                serde: org.apache.kafka.common.serialization.StringSerializer
              value:
                serde: io.confluent.kafka.serializers.KafkaAvroSerializer

security.basic.enable: false
management.security.enabled: false
security.ignored: /**

示例.asvc

{
  "namespace": "com.sample.redisandkafka",
  "type": "record",
  "name": "SampleEvent",
  "fields": [
    {"name": "field", "type": "string", "default": "null"}
  ]
}

更新1: 使用相同的sample应用程序,我们删除了与redis相关的依赖项[下面提供]并删除了类

RedisCacheConfig class
并单独制作了应用程序kafka,它运行良好

pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

应用程序.yml

  cache:
    cache-names: sample-cache
    default-ttl: 1 # TTL in hours
    enabled: true
    type: redis
    host: localhost
    port: 6379
    caches-ttl: # TTL in hours
      sample-cache: 1
java spring-boot serialization apache-kafka redis
1个回答
0
投票

将配置“useNativeEncoding:true”添加到您的生产者中,这样它将绕过 Spring Boot 自动配置的默认序列化器,并使用您绑定的序列化器。

注意: 所有配置更新均在 application.yml 中完成

本机编码:

     sampleOutput:
       contentType: application/*+avro
       destination: redisandkafka.local.sample_event_output
       group: sample.local.sample_event
       producer:
         useNativeEncoding: true

此外,您还必须将 key.serializer 和 value.serializer 添加到您的案例 KafkaAvroSerializer for Value 中所需的序列化器中。

添加串行器:

      kafka:
        binder:
          producer-properties:
            schema.registry.url: http://localhost:9081
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

我上面添加的Serailizer是全局配置。添加“useNativeEncoding: true”,生产者需要绕过 Spring Boot 配置的默认序列化器并使用绑定的序列化器。

同样,如果需要,您可以对解串器执行相同的操作。

本机解码:

      bindings:
        sampleInput:
          contentType: application/*+avro
          destination: redisandkafka.local.sample_event
          group: sample.local.sample_event
          consumer:
            useNativeDecoding: true

添加解串器:

      kafka:
        binder:
          producer-properties:
            schema.registry.url: http://localhost:9081
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

如有任何疑问,请告诉我。

© www.soinside.com 2019 - 2024. All rights reserved.