我试图将Apache Camel处理器中处理的数据推送到一个监听器类中。在处理器类实例中,我试图在Camel上下文实例化的过程中注册监听器,但不知为何会失败。也许我在这里犯了根本性的错误,这是不可能的。如果是这样的话,请你告诉我。
我有一个Apache Camel路由,从ActiveMQ服务器获取JSON消息,并将这些JSON消息推送到Camel-Spring XML中定义的自定义处理器类。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="url to ActiveMQ" />
<property name="clientID" value="clientID" />
<property name="userName" value="theUser" />
<property name="password" value="thePassword" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="8" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory" />
</bean>
<bean id="customProcessor" class="...CustomProcessorClass" />
<camelContext id="matrixProfileContext" xmlns="http://camel.apache.org/schema/spring">
<route id="matrixProfileRoute" autoStartup="false">
<from uri="activemq:queue:queuename" />
<log message="${body}" />
<to uri="customProcessor" />
</route>
</camelContext>
</beans>
我的想法是,类CustomProcessor解开通过路由传送的JSON内容,并将POJO推送给一个实现监听器接口的监听器类。
public interface ProcessorListenerIF {
public void doOnDataProcessed(POJO processedData);
}
我通过单元测试来测试整个设置。
public class TestProcessor extends TestCase {
@Test
public void testRoute() throws Exception {
MyActiveMQConnector camelContext = new MyActiveMQConnector(new TestListener());
try {
camelContext.startConnections();
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
camelContext.stopConnection();
}
}
private class TestListener implements ProcessorListenerIF {
@Override
public void doOnDataProcessed(POJO data) {
System.out.println(data);
}
}
}
骆驼处理器有两个方法:
public void addListener(MatrixProfileProcessorListenerIF listener) {
_processorListeners.add(listener);
}
@Override
public void process(Exchange exchange) throws Exception {
Pseudocode: POJO data = unmarshal_by_JSON-JAVA(exchange)
_processorListeners.parallelStream().forEach(listener -> {
listener.doOnDataProcessed(data);
});
}
我在ActiveMQConnector的构造函数中注册监听器。
public class ActiveMQConnector {
private SpringCamelContext _camelContext = null;
public ActiveMQConnector(ProcessorListenerIF listener) {
ApplicationContext appContext = new ClassPathXmlApplicationContext("camelContext.xml");
_camelContext = new SpringCamelContext(appContext);
-------------------------------------
((CustomProcessor) _camelContext.getProcessor("customProcessor")).addListener(listener);
-------------------------------------
}
public void startConnections() throws Exception {
try {
_camelContext.start();
} catch (Exception e) {
exception handling
}
}
... more methods
上面高亮显示的那行((CustomProcessor)...)失败了: 语句是 _camelContext.getProcessor
没有找到任何东西,实例中的路由在 _camelContext
是空的。
我如何实现将处理过的数据从处理器推送到某个观察者?
我找到了另一个解决方案,它是纯粹基于Java的,没有Spring XML。
单元测试仍然像上面一样。我没有通过Spring XML定义ActiveMQEndpoint,而是创建了一个新的类。
public class MyActiveMQConnection {
public static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://<activemq-url>:<port>");
connectionFactory.setUserName("myUsername");
// connection factory configuration:
connectionFactory.setUseAsyncSend(false);
connectionFactory.setClientID(UUID.randomUUID().toString());
connectionFactory.setConnectResponseTimeout(300);
... whatever ...
return connectionFactory;
}
}
此外,我改变了类的构造函数 ActiveMQConnector
:
public ActiveMQConnector(ProcessorListenerIF listener) throws Exception {
_camelContext = new DefaultCamelContext();
_camelContext.addComponent("activemqEndpoint",
JmsComponent.jmsComponent(MyActiveMQConnection.createActiveMQConnectionFactory()));
_camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
MyCustomProcessor processor = MyCustomProcessor.getInstance();
processor.addListener(listener);
from("activemqEndpoint:queue:matrixprofile") //
.process(processor) //
.to("stream:out");
}
});
}
我把处理器实现为单人,看起来像这样(为了完整)。
public class MyCustomProcessor implements Processor {
private final Set<ProcessorListenerIF> _processorListeners = new HashSet<>();
private static volatile MyCustomProcessor _myInstance = null;
private static Object _token = new Object();
private MyCustomProcessor() {
}
public static MyCustomProcessor getInstance() {
MyCustomProcessor result = _myInstance;
if (result == null) {
synchronized (_token) {
result = _myInstance;
if (result == null)
_myInstance = result = new MyCustomProcessor();
}
}
return result;
}
public void addListener(ProcessorListenerIF listener) {
_processorListeners.add(listener);
}
/**
* I assume the JSON has the following structure:
* {timestamp: long, data: double[]}
**/
@Override
public void process(Exchange exchange) throws Exception {
_processorListeners.parallelStream().forEach(listener -> {
// convert incoming message body to json object assuming data structure above
JSONObject jsonObject = new JSONObject(exchange.getMessage().getBody().toString());
MyPOJO myPojo = new MyPOJO();
try {
myPojo.setTimestamp(jsonObject.getLong("timestamp"));
} catch (Exception e) {
...
}
try {
JSONArray dataArray = jsonObject.getJSONArray("data");
double[] data = new double[dataArray.length()];
for (int i = 0; i < dataArray.length(); i++) {
data[i] = Double.valueOf(dataArray.get(i).toString());
}
myPojo.setData(data);
} catch (Exception e) {
...
}
listener.doOnDataProcessed(myPojo);
});
}
}