使用Apache camel将MyObject传递给Bean路由的可能性?

问题描述 投票:0回答:3

我正在使用apache camel JAVA DSLs从Apache kafka消耗消息.我正在写一个对象,通过将它转换为 byte[] 当我消耗它时,我收到一个消息,回馈给我的是 byte[]. 我把它反序列化,得到一个对象。

我检查它是否是一个 MyObject 然后需要使用java DSL将其传递给Bean。.to(). 我的代码如下。

public class KafkaRouter extends RouteBuilder {
    
    private MessageBean msgBean;
    
    @Override
    public void configure() throws Exception {
        
        from("{{kafka.cons.uri}}").process(new Processor() {
            
            
            
            @Override
            public void process(Exchange exchange) throws Exception {
                Object obj = SerializationUtils.deserialize(exchange.getIn().getBody(byte[].class));                //TODO cast to specific class as returned after deserialization.
                
                if(obj !=null && obj instanceof MessageBean){
                    
                    msgBean = (MessageBean)obj;
                    
                }
                else {
                    
                    throw new PTFException("Invalid Message read in Kafka Consumer");
                }
                
            }
            
            
        }).bean(PTFTransformerService.class,"callTransformerService(msgBean)"); ;
    }

现在的问题是我只想 MyObject 而我不想在相应的调用方法中使用 TypeConvertors. 我不想在方法中得到Exchangebody,我将在过程中处理我的流,如果读到无效的消息就抛出异常,而不转发到bean。

我在另一端的方法将是

private void callTransformerService(MessageBean msgObj){
    // Got my object here ;-)   
        
    }
java apache-camel
3个回答
3
投票

添加 @Body 函数参数前 MessageBean msgObj:

import org.apache.camel.Body;

private void callTransformerService(@Body MessageBean msgObj){

}

1
投票

你可能需要写一个自定义的回退类型转换器,可以从kafka字节[]转换到你的POJOs。

那么你可以在bean中定义pojo类型即可,Camel将使用回落类型转换器尝试转换为pojo类型。


0
投票

你可以在处理器中设置交换体,如下所示。

msgBean = (MessageBean)obj;
exchange.getIn().setBody(msgBean, MessageBean.class);
void callTransformerService(@Body MessageBean msgObj) {}
© www.soinside.com 2019 - 2024. All rights reserved.