注册Apache Camel处理器的监听器

问题描述 投票:0回答:1

我试图将Apache Camel处理器中处理的数据推送到一个监听器类中。在处理器类实例中,我试图在Camel上下文实例化的过程中注册监听器,但不知为何会失败。也许我在这里犯了根本性的错误,这是不可能的。如果是这样的话,请你告诉我。

我有一个Apache Camel路由,从ActiveMQ服务器获取JSON消息,并将这些JSON消息推送到Camel-Spring XML中定义的自定义处理器类。

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://camel.apache.org/schema/spring
    http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="url to ActiveMQ" />
        <property name="clientID" value="clientID" />
        <property name="userName" value="theUser" />
        <property name="password" value="thePassword" />
    </bean>
    <bean id="pooledConnectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
    </bean>
    <bean id="customProcessor" class="...CustomProcessorClass" />
    <camelContext id="matrixProfileContext" xmlns="http://camel.apache.org/schema/spring">  
        <route id="matrixProfileRoute" autoStartup="false">
            <from uri="activemq:queue:queuename" />
            <log message="${body}" />
            <to uri="customProcessor" />
        </route>
    </camelContext>
</beans>

我的想法是,类CustomProcessor解开通过路由传送的JSON内容,并将POJO推送给一个实现监听器接口的监听器类。

public interface ProcessorListenerIF {

    public void doOnDataProcessed(POJO processedData);
}

我通过单元测试来测试整个设置。

public class TestProcessor extends TestCase {

    @Test
    public void testRoute() throws Exception {
        MyActiveMQConnector camelContext = new MyActiveMQConnector(new TestListener());
        try {
            camelContext.startConnections();
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            camelContext.stopConnection();
        }
    }

    private class TestListener implements ProcessorListenerIF {

        @Override
        public void doOnDataProcessed(POJO data) {
            System.out.println(data);
        }
    }
}

骆驼处理器有两个方法:

public void addListener(MatrixProfileProcessorListenerIF listener) {
    _processorListeners.add(listener);
}

@Override
public void process(Exchange exchange) throws Exception {

    Pseudocode: POJO data = unmarshal_by_JSON-JAVA(exchange)

    _processorListeners.parallelStream().forEach(listener -> {
        listener.doOnDataProcessed(data);
    });
}

我在ActiveMQConnector的构造函数中注册监听器。

public class ActiveMQConnector {

    private SpringCamelContext _camelContext = null;

    public ActiveMQConnector(ProcessorListenerIF listener) {
        ApplicationContext appContext = new ClassPathXmlApplicationContext("camelContext.xml");
        _camelContext = new SpringCamelContext(appContext);
-------------------------------------
        ((CustomProcessor) _camelContext.getProcessor("customProcessor")).addListener(listener);
-------------------------------------
    }

    public void startConnections() throws Exception {
        try {
            _camelContext.start();
        } catch (Exception e) {
            exception handling
        }
    }
... more methods

上面高亮显示的那行((CustomProcessor)...)失败了: 语句是 _camelContext.getProcessor 没有找到任何东西,实例中的路由在 _camelContext 是空的。

我如何实现将处理过的数据从处理器推送到某个观察者?

java apache-camel spring-camel
1个回答
0
投票

我找到了另一个解决方案,它是纯粹基于Java的,没有Spring XML。

单元测试仍然像上面一样。我没有通过Spring XML定义ActiveMQEndpoint,而是创建了一个新的类。

public class MyActiveMQConnection {

    public static ActiveMQConnectionFactory createActiveMQConnectionFactory() {

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://<activemq-url>:<port>");
        connectionFactory.setUserName("myUsername");
        // connection factory configuration:
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID(UUID.randomUUID().toString());
        connectionFactory.setConnectResponseTimeout(300);
        ... whatever ...

        return connectionFactory;
    }
}

此外,我改变了类的构造函数 ActiveMQConnector:

public ActiveMQConnector(ProcessorListenerIF listener) throws Exception {

    _camelContext = new DefaultCamelContext();
    _camelContext.addComponent("activemqEndpoint",
            JmsComponent.jmsComponent(MyActiveMQConnection.createActiveMQConnectionFactory()));
    _camelContext.addRoutes(new RouteBuilder() {

        @Override
        public void configure() throws Exception {

            MyCustomProcessor processor = MyCustomProcessor.getInstance();
            processor.addListener(listener);

            from("activemqEndpoint:queue:matrixprofile") //
                    .process(processor) //
                    .to("stream:out");
        }
    });
}

我把处理器实现为单人,看起来像这样(为了完整)。

public class MyCustomProcessor implements Processor {

    private final Set<ProcessorListenerIF> _processorListeners = new HashSet<>();
    private static volatile MyCustomProcessor      _myInstance         = null;
    private static Object                          _token              = new Object();

    private MyCustomProcessor() {

    }

    public static MyCustomProcessor getInstance() {
        MyCustomProcessor result = _myInstance;
        if (result == null) {
            synchronized (_token) {
                result = _myInstance;
                if (result == null)
                    _myInstance = result = new MyCustomProcessor();
            }
        }
        return result;
    }

    public void addListener(ProcessorListenerIF listener) {
        _processorListeners.add(listener);
    }

    /**
     * I assume the JSON has the following structure:
     * {timestamp: long, data: double[]}
    **/
    @Override
    public void process(Exchange exchange) throws Exception {

        _processorListeners.parallelStream().forEach(listener -> {
            // convert incoming message body to json object assuming data structure above
            JSONObject    jsonObject    = new JSONObject(exchange.getMessage().getBody().toString());
            MyPOJO myPojo = new MyPOJO();

            try {
                myPojo.setTimestamp(jsonObject.getLong("timestamp"));
            } catch (Exception e) {
                ...
            }
            try {
                JSONArray dataArray = jsonObject.getJSONArray("data");
                double[]  data      = new double[dataArray.length()];
                for (int i = 0; i < dataArray.length(); i++) {
                    data[i] = Double.valueOf(dataArray.get(i).toString());
                }
                myPojo.setData(data);
            } catch (Exception e) {
                ...
            }

            listener.doOnDataProcessed(myPojo);
        });
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.