我已经了解了如何使用 AQ(Streams?)包创建 Oracle 数据库。我还在 Oracle 中(手动)创建了一些队列。 (使用 PL/SQL 和 SQL)。
但是,我很难从 Spring 建立正确的连接。
以下作品(使用
oracle.AQ
java 包):
private final String aqUrl = "jdbc:oracle:thin:@localhost:1521:orcl";
private final String aqUser = "queue_mut";
private final String aqPassword = "******";
private final String aqSchema = "queue_mut";
private final String aqTable = "aq_table1";
private final String aqQueue = "aq_queue1";
@Test
public void testManualAQ() throws ClassNotFoundException, SQLException, AQException {
Class.forName("oracle.jdbc.driver.OracleDriver");
Connection connection = DriverManager.getConnection(aqUrl, aqUser, aqPassword);
connection.setAutoCommit(false);
Class.forName("oracle.AQ.AQOracleDriver");
AQSession aqSession = AQDriverManager.createAQSession(connection);
AQQueueTable q_table = aqSession.createQueueTable(aqSchema, aqTable, new AQQueueTableProperty("RAW"));
aqSession.createQueue(q_table, aqQueue, new AQQueueProperty());
}
(基于 https://docs.oracle.com/cd/B10501_01/appdev.920/a96587/apexampl.htm)
这表明我可以连接到 Oracle 并实现 AQ 功能。
现在,我正在尝试创建 Java 配置 bean 以便使用
JmsTemplate
。
@Resource
private JmsTemplate jmsTemplate;
@Test
public void testJmsTemplate() {
String xmlval = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<product id=\"10\">\n" +
" <description>Foo</description>\n" +
" <price>2.05</price>\n" +
"</product>";
jmsTemplate.convertAndSend(aqSchema + ".jms_ws_incoming_queue", xmlval);
}
(是的,队列已经存在;-))
具有以下配置类:
import oracle.jms.AQjmsFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.sql.DataSource;
@Configuration
public class OracleAQConfiguration {
@Bean
public DataSourceTransactionManager transactionManager(DataSource dataSource) {
DataSourceTransactionManager manager = new DataSourceTransactionManager();
manager.setDataSource(dataSource);
return manager;
}
@Bean
public ConnectionFactory connectionFactory(DataSource dataSource) throws JMSException {
return AQjmsFactory.getQueueConnectionFactory(dataSource);
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setSessionTransacted(true);
jmsTemplate.setConnectionFactory(connectionFactory);
return jmsTemplate;
}
}
并且具有属性 yml:
spring:
datasource:
url: jdbc:oracle:thin:@localhost:1521:orcl
username: queue_mut
password: ******
driverClassName: oracle.jdbc.driver.OracleDriver
但是这样我就得到了我无法理解的错误:
2017-04-19 12:11:17,151 INFO my.project.QueueTest: Started QueueTest in 5.305 seconds (JVM running for 6.588)
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is oracle.jms.AQjmsException: Error creating the db_connection; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy102 cannot be cast to oracle.jdbc.internal.OracleConnection
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:570)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:658)
at my.project.QueueTest.testJmsTemplate(QueueTest.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: oracle.jms.AQjmsException: Error creating the db_connection
at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:625)
at oracle.jms.AQjmsDBConnMgr.<init>(AQjmsDBConnMgr.java:399)
at oracle.jms.AQjmsConnection.<init>(AQjmsConnection.java:249)
at oracle.jms.AQjmsConnectionFactory.createConnection(AQjmsConnectionFactory.java:513)
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:474)
... 36 more
Caused by: java.lang.ClassCastException: com.sun.proxy.$Proxy102 cannot be cast to oracle.jdbc.internal.OracleConnection
at oracle.jms.AQjmsGeneralDBConnection.getProviderKey(AQjmsGeneralDBConnection.java:98)
at oracle.jms.AQjmsGeneralDBConnection.<init>(AQjmsGeneralDBConnection.java:67)
at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:566)
... 41 more
我相信发生Cast异常是因为它是一个
ProxyConnection[PooledConnection[oracle.jdbc.driver.T4CConnection@40016ce1]]
。但我不知道如何解决这个问题。
更改
jdbc
库,就我而言,这修复了它(如果没有,请尝试使用其他版本):
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc7</artifactId>
<version>12.1.0.2.0</version>
</dependency>
当尝试从 Spring boot 访问 Oracle AQ 时,我们遇到了同样的异常。研究表明,抛出此异常是因为数据库连接池库不允许访问oracle AQ库所需的底层连接。(dbcp和tomcat连接池库都抛出异常,不一样但相似)
当我们从依赖项中删除数据库连接池库时,此异常消失了,这导致整个应用程序没有数据库连接池的不良状态。
我们注意到,如果我们使用以下方法,则不会抛出异常
AQjmsFactory.getQueueConnectionFactory(url, info);
也许解决方案也缺少连接池,但这仅限于从 AQ 读取的组件。应用程序中的其他组件将受益于连接池
这是 Bean 定义的 java 配置:
@Bean
public QueueConnectionFactory connectionFactory() throws Exception {
OracleServiceInfo serviceInfo = (OracleServiceInfo) this.cloud().getServiceInfo(NAME_PRIMARY_DS);
Properties info = new Properties();
String url = serviceInfo.getJdbcUrl();
info.put("driver-name", "oracle.jdbc.OracleDriver");
info.put("user", serviceInfo.getUserName());
info.put("password", serviceInfo.getPassword());
return oracle.jms.AQjmsFactory.getQueueConnectionFactory(url, info);
}
@Bean
public JmsTemplate jmsTemplate() throws Exception {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory());
return jmsTemplate;
}
我还不确定这是否是一个好的解决方案。但这绝对是摆脱问题中讨论的异常的一种方法。
嗨,我也花了相当长的时间才使连接正常工作,但最终成功了,方法如下:
首先确保 Oracle AQ 队列表的有效负载未设置为 RAW,但最好设置为文本:SYS.AQ$_JMS_TEXT_MESSAGE
接下来使用类似于以下的 OracleAQConfiguration:
import oracle.jdbc.pool.OracleDataSource;
import oracle.jms.AQjmsFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.JMSException;
import javax.jms.QueueConnectionFactory;
import javax.sql.DataSource;
import java.sql.SQLException;
@Configuration
public class OracleAQConfiguration {
// Values are retrieved from custom added props in Spring application.properties
@Value("${myapplication.datasource.user}")
private String user;
@Value("${myapplication.datasource.password}")
private String password;
@Value("${myapplication.datasource.connectionstring}")
private String connectionstring;
@Bean
/**
* Spring bean with the configuration details of where the Oracle database is containing the QUEUES
*/
public DataSource dataSource() throws SQLException {
OracleDataSource ds = new OracleDataSource();
ds.setUser(user);
ds.setPassword(password);
ds.setURL(connectionstring);
ds.setImplicitCachingEnabled(true);
ds.setFastConnectionFailoverEnabled(true);
return ds;
}
@Bean
/**
* The KEY component effectively connecting to the Oracle AQ system using the datasource input
*/
public QueueConnectionFactory connectionFactory(DataSource dataSource) throws JMSException {
return AQjmsFactory.getQueueConnectionFactory(dataSource);
}
}
接下来使用类似于下面的 JMSConfiguration。 在这里,我读取和写入同一个队列,这在真实的应用程序集成场景中是不可能的。但测试没问题
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import javax.jms.ConnectionFactory;
import javax.sql.DataSource;
@Configuration
public class JMSConfiguration {
private static final String QUEUENAME_WRITE = "MYQUEUE";
private static final String QUEUENAME_READ = "MYQUEUE";
@Autowired
private JMSReceiver jmsReceiver;
@Bean
/**
* Spring bean to WRITE/SEND/ENQUEUE messages on a queue with a certain name
*/
public JmsTemplate jmsTemplate(ConnectionFactory conFactory) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDefaultDestinationName(QUEUENAME_WRITE);
jmsTemplate.setSessionTransacted(true);
jmsTemplate.setConnectionFactory(conFactory);
return jmsTemplate;
}
/**
* Spring bean to READ/RECEIVE/DEQUEUE messages of a queue with a certain name
* All of this happens under a code managed transaction
* to commit the change on Oracle (remove of the message from the queue table)
* Reference the application custom code handling the message here
*/
@Bean
public DefaultMessageListenerContainer messageListenerContainer(ConnectionFactory conFactory, DataSource dataSource) {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setDestinationName(QUEUENAME_READ);
dmlc.setSessionTransacted(true);
dmlc.setConnectionFactory(conFactory);
DataSourceTransactionManager manager = new DataSourceTransactionManager();
manager.setDataSource(dataSource);
dmlc.setTransactionManager(manager);
// Add here our self-written JMS Receiver
dmlc.setMessageListener(jmsReceiver);
return dmlc;
}
}
最后,为了处理传入的 JMS 消息,请使用以下内容:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
@Component
public class JMSReceiver implements SessionAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(JMSReceiver.class);
@Override
public void onMessage(Message message, Session session) throws JMSException {
// We know/assume the Queue Payload type was set to 'TextMessage'
TextMessage txtMessage = (TextMessage) message;
logger.info("JMS Text Message received: " + txtMessage.getText());
// ... further implementation
}
}
问题是 AQ 代码需要一个 OracleConnection,但在池化时连接被包装,因此失败
@Bean
public AQjmsConnectionFactory connectionFactory(@Autowired DataSource dataSource) throws Exception{
AQjmsConnectionFactory connectionFactory=new AQjmsConnectionFactory();
connectionFactory.setDatasource(dataSource);
return connectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(@Autowired AQjmsConnectionFactory connectionFactory) throws Exception{
JmsTemplate jmsTemplate=new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory);
return jmsTemplate;
}
POM File:
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
</dependency>
<dependency>
<groupId>com.oracle.database.messaging</groupId>
<artifactId>aqapi-jakarta</artifactId>
<version>23.2.1.0</version>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/jakarta.transaction/jakarta.transaction-api -->
<dependency>
<groupId>jakarta.transaction</groupId>
<artifactId>jakarta.transaction-api</artifactId>
</dependency>
//使用 Spring JMS 模板向 Oracle AQ JMS 队列发送消息的代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Session;
@Service
public class JmsUtil {
@Autowired
JmsTemplate jmsTemplate;
//Send JMS message to Oracle AQ JMS queue
public void sendMessageToJmsQueue(YourPojoClass yourPojoClass) {
try {
ObjectMapper mapper=new ObjectMapper();
String pojoAsJson=mapper.writeValueAsString(yourPojoClass);
jmsTemplate.send("YourQueueName", new MessageCreator() {
public Message createMessage(Session session) throws JMSException{
return session.createTextMessage(pojoAsJson);
}
});
}catch(Exception e) {
e.printStackTrace();
}
}
}