垃圾值作为kafka主题中的键发布

问题描述 投票:0回答:1

我正在使用Long序列化程序作为键,并使用String序列化程序来作为value,在我们检索消息时将消息发布到kafka主题后,连同键一起将key视为如下所示的垃圾值

^@^@^@^AÏÃ<9a>ò

kafka生产者配置有什么问题吗?

更新:

以下生产者配置

                    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
            configProps.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutInMillis);
            configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, abcompressionType);

@Bean
    public ProducerFactory<Long, String> longProducerFactory() {
        return new DefaultKafkaProducerFactory<>(longProducerConfigs());
    }

@Bean
    public KafkaTemplate<Long, String> longKeyKafkaTemplate() {
        return new KafkaTemplate<>(longProducerFactory());
    }

和下面发送呼叫

longKeyKafkaTemplate.send(topicName, key, message);
spring-boot apache-kafka spring-kafka kafka-producer-api
1个回答
0
投票

[当我在Kafka Tool中看到键值时,kafkatool.com,我发现了那些垃圾值,

不是“垃圾”,它是一个Long值,显示为没有适当反序列化的String。

我不熟悉该工具,也不知道是否可以使用它为键指定反序列化器,但是使用命令行工具kafka-console-consumer.sh可以指定要使用的反序列化器。

$ kafka-console-consumer 
This tool helps to read data from Kafka topics and outputs it to standard output.
Option                                   Description                            
------                                   -----------                            
--bootstrap-server <String: server to    REQUIRED: The server(s) to connect to. 
  connect to>                                                                   
--consumer-property <String:             A mechanism to pass user-defined       
  consumer_prop>                           properties in the form key=value to  
                                           the consumer.                        
--consumer.config <String: config file>  Consumer config properties file. Note  
                                           that [consumer-property] takes       
                                           precedence over this config.         
--enable-systest-events                  Log lifecycle events of the consumer   
                                           in addition to logging consumed      
                                           messages. (This is specific for      
                                           system tests.)                       
--formatter <String: class>              The name of a class to use for         
                                           formatting kafka messages for        
                                           display. (default: kafka.tools.      
                                           DefaultMessageFormatter)             
--from-beginning                         If the consumer does not already have  
                                           an established offset to consume     
                                           from, start with the earliest        
                                           message present in the log rather    
                                           than the latest message.             
--group <String: consumer group id>      The consumer group id of the consumer. 
--help                                   Print usage information.               
--isolation-level <String>               Set to read_committed in order to      
                                           filter out transactional messages    
                                           which are not committed. Set to      
                                           read_uncommitted to read all         
                                           messages. (default: read_uncommitted)
--key-deserializer <String:                                                     
  deserializer for key>                                                         
--max-messages <Integer: num_messages>   The maximum number of messages to      
                                           consume before exiting. If not set,  
                                           consumption is continual.            
--offset <String: consume offset>        The offset id to consume from (a non-  
                                           negative number), or 'earliest'      
                                           which means from beginning, or       
                                           'latest' which means from end        
                                           (default: latest)                    
--partition <Integer: partition>         The partition to consume from.         
                                           Consumption starts from the end of   
                                           the partition unless '--offset' is   
                                           specified.                           
--property <String: prop>                The properties to initialize the       
                                           message formatter. Default           
                                           properties include:                  
                                            print.timestamp=true|false            
                                            print.key=true|false                  
                                            print.value=true|false                
                                            key.separator=<key.separator>         
                                            line.separator=<line.separator>       
                                            key.deserializer=<key.deserializer>   
                                            value.deserializer=<value.            
                                           deserializer>                        
                                         Users can also pass in customized      
                                           properties for their formatter; more 
                                           specifically, users can pass in      
                                           properties keyed with 'key.          
                                           deserializer.' and 'value.           
                                           deserializer.' prefixes to configure 
                                           their deserializers.                 
--skip-message-on-error                  If there is an error when processing a 
                                           message, skip it instead of halt.    
--timeout-ms <Integer: timeout_ms>       If specified, exit if no message is    
                                           available for consumption for the    
                                           specified interval.                  
--topic <String: topic>                  The topic id to consume on.            
--value-deserializer <String:                                                   
  deserializer for values>                                                      
--version                                Display Kafka version.                 
--whitelist <String: whitelist>          Regular expression specifying          
                                           whitelist of topics to include for   
                                           consumption.                         
© www.soinside.com 2019 - 2024. All rights reserved.