如何在Spring Boot中实现Oracle AQ队列?

问题描述 投票:0回答:5

我已经了解了如何使用 AQ(Streams?)包创建 Oracle 数据库。我还在 Oracle 中(手动)创建了一些队列。 (使用 PL/SQL 和 SQL)。

但是,我很难从 Spring 建立正确的连接。

以下作品(使用

oracle.AQ
java 包):

private final String aqUrl = "jdbc:oracle:thin:@localhost:1521:orcl";
private final String aqUser = "queue_mut";
private final String aqPassword = "******";
private final String aqSchema = "queue_mut";
private final String aqTable = "aq_table1";
private final String aqQueue = "aq_queue1";


@Test
public void testManualAQ() throws ClassNotFoundException, SQLException, AQException {

    Class.forName("oracle.jdbc.driver.OracleDriver");
    Connection connection = DriverManager.getConnection(aqUrl, aqUser, aqPassword);
    connection.setAutoCommit(false);

    Class.forName("oracle.AQ.AQOracleDriver");
    AQSession aqSession = AQDriverManager.createAQSession(connection);
    AQQueueTable q_table = aqSession.createQueueTable(aqSchema, aqTable, new AQQueueTableProperty("RAW"));
    aqSession.createQueue(q_table, aqQueue, new AQQueueProperty());

}

(基于 https://docs.oracle.com/cd/B10501_01/appdev.920/a96587/apexampl.htm

这表明我可以连接到 Oracle 并实现 AQ 功能。

现在,我正在尝试创建 Java 配置 bean 以便使用

JmsTemplate

@Resource
private JmsTemplate jmsTemplate;

@Test
public void testJmsTemplate() {
    String xmlval = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
            "<product id=\"10\">\n" +
            " <description>Foo</description>\n" +
            " <price>2.05</price>\n" +
            "</product>";

    jmsTemplate.convertAndSend(aqSchema + ".jms_ws_incoming_queue", xmlval);
}

(是的,队列已经存在;-))

具有以下配置类:

import oracle.jms.AQjmsFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.sql.DataSource;

@Configuration
public class OracleAQConfiguration {

    @Bean
    public DataSourceTransactionManager transactionManager(DataSource dataSource) {
        DataSourceTransactionManager manager = new DataSourceTransactionManager();
        manager.setDataSource(dataSource);
        return manager;
    }

    @Bean
    public ConnectionFactory connectionFactory(DataSource dataSource) throws JMSException {
        return AQjmsFactory.getQueueConnectionFactory(dataSource);
    }

    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setConnectionFactory(connectionFactory);
        return jmsTemplate;
    }
}

并且具有属性 yml:

spring:
  datasource:
    url: jdbc:oracle:thin:@localhost:1521:orcl
    username: queue_mut
    password: ******
    driverClassName: oracle.jdbc.driver.OracleDriver

但是这样我就得到了我无法理解的错误:

2017-04-19 12:11:17,151  INFO my.project.QueueTest: Started QueueTest in 5.305 seconds (JVM running for 6.588)

org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is oracle.jms.AQjmsException: Error creating the db_connection; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy102 cannot be cast to oracle.jdbc.internal.OracleConnection

    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:570)
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:658)
    at my.project.QueueTest.testJmsTemplate(QueueTest.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: oracle.jms.AQjmsException: Error creating the db_connection
    at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:625)
    at oracle.jms.AQjmsDBConnMgr.<init>(AQjmsDBConnMgr.java:399)
    at oracle.jms.AQjmsConnection.<init>(AQjmsConnection.java:249)
    at oracle.jms.AQjmsConnectionFactory.createConnection(AQjmsConnectionFactory.java:513)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:474)
    ... 36 more
Caused by: java.lang.ClassCastException: com.sun.proxy.$Proxy102 cannot be cast to oracle.jdbc.internal.OracleConnection
    at oracle.jms.AQjmsGeneralDBConnection.getProviderKey(AQjmsGeneralDBConnection.java:98)
    at oracle.jms.AQjmsGeneralDBConnection.<init>(AQjmsGeneralDBConnection.java:67)
    at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:566)
    ... 41 more

我相信发生Cast异常是因为它是一个

ProxyConnection[PooledConnection[oracle.jdbc.driver.T4CConnection@40016ce1]]
。但我不知道如何解决这个问题。

java spring jms oracle-aq advanced-queuing
5个回答
3
投票

更改

jdbc
库,就我而言,这修复了它(如果没有,请尝试使用其他版本):

<dependency>
    <groupId>com.oracle</groupId>
    <artifactId>ojdbc7</artifactId>
    <version>12.1.0.2.0</version>
</dependency>

2
投票

当尝试从 Spring boot 访问 Oracle AQ 时,我们遇到了同样的异常。研究表明,抛出此异常是因为数据库连接池库不允许访问oracle AQ库所需的底层连接。(dbcp和tomcat连接池库都抛出异常,不一样但相似)

当我们从依赖项中删除数据库连接池库时,此异常消失了,这导致整个应用程序没有数据库连接池的不良状态。

我们注意到,如果我们使用以下方法,则不会抛出异常

AQjmsFactory.getQueueConnectionFactory(url, info);

也许解决方案也缺少连接池,但这仅限于从 AQ 读取的组件。应用程序中的其他组件将受益于连接池

这是 Bean 定义的 java 配置:

@Bean
public QueueConnectionFactory connectionFactory() throws Exception {
    OracleServiceInfo serviceInfo = (OracleServiceInfo) this.cloud().getServiceInfo(NAME_PRIMARY_DS);
    Properties info = new Properties();
    String url = serviceInfo.getJdbcUrl();
    info.put("driver-name", "oracle.jdbc.OracleDriver");
    info.put("user", serviceInfo.getUserName());
    info.put("password", serviceInfo.getPassword());
    return oracle.jms.AQjmsFactory.getQueueConnectionFactory(url, info);
}

@Bean
public JmsTemplate jmsTemplate() throws Exception {
    JmsTemplate jmsTemplate = new JmsTemplate();
    jmsTemplate.setConnectionFactory(connectionFactory());
    return jmsTemplate;
}

我还不确定这是否是一个好的解决方案。但这绝对是摆脱问题中讨论的异常的一种方法。


1
投票

嗨,我也花了相当长的时间才使连接正常工作,但最终成功了,方法如下:

首先确保 Oracle AQ 队列表的有效负载未设置为 RAW,但最好设置为文本:SYS.AQ$_JMS_TEXT_MESSAGE

接下来使用类似于以下的 OracleAQConfiguration:

import oracle.jdbc.pool.OracleDataSource;
import oracle.jms.AQjmsFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.JMSException;
import javax.jms.QueueConnectionFactory;
import javax.sql.DataSource;
import java.sql.SQLException;

@Configuration
public class OracleAQConfiguration {

    // Values are retrieved from custom added props in Spring application.properties

    @Value("${myapplication.datasource.user}")
    private String user;

    @Value("${myapplication.datasource.password}")
    private String password;

    @Value("${myapplication.datasource.connectionstring}")
    private String connectionstring;

    @Bean
    /**
     * Spring bean with the configuration details of where the Oracle database is containing the QUEUES
     */
    public DataSource dataSource() throws SQLException {
        OracleDataSource ds = new OracleDataSource();
        ds.setUser(user);
        ds.setPassword(password);
        ds.setURL(connectionstring);
        ds.setImplicitCachingEnabled(true);
        ds.setFastConnectionFailoverEnabled(true);
        return ds;
    }

    @Bean
    /**
     * The KEY component effectively connecting to the Oracle AQ system using the datasource input
     */
    public QueueConnectionFactory connectionFactory(DataSource dataSource) throws JMSException {
        return AQjmsFactory.getQueueConnectionFactory(dataSource);
    }

}

接下来使用类似于下面的 JMSConfiguration。 在这里,我读取和写入同一个队列,这在真实的应用程序集成场景中是不可能的。但测试没问题

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

import javax.jms.ConnectionFactory;
import javax.sql.DataSource;

@Configuration
public class JMSConfiguration {
    private static final String QUEUENAME_WRITE = "MYQUEUE";
    private static final String QUEUENAME_READ = "MYQUEUE";

    @Autowired
    private JMSReceiver jmsReceiver;

    @Bean
    /**
     * Spring bean to WRITE/SEND/ENQUEUE messages on a queue with a certain name
     */
    public JmsTemplate jmsTemplate(ConnectionFactory conFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setDefaultDestinationName(QUEUENAME_WRITE);
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setConnectionFactory(conFactory);

        return jmsTemplate;
    }

    /**
     * Spring bean to READ/RECEIVE/DEQUEUE messages of a queue with a certain name
     * All of this happens under a code managed transaction
     * to commit the change on Oracle (remove of the message from the queue table)
     * Reference the application custom code handling the message here
     */
    @Bean
    public DefaultMessageListenerContainer messageListenerContainer(ConnectionFactory conFactory, DataSource dataSource) {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setDestinationName(QUEUENAME_READ);
        dmlc.setSessionTransacted(true);
        dmlc.setConnectionFactory(conFactory);

        DataSourceTransactionManager manager = new DataSourceTransactionManager();
        manager.setDataSource(dataSource);
        dmlc.setTransactionManager(manager);

        // Add here our self-written JMS Receiver
        dmlc.setMessageListener(jmsReceiver);
        return dmlc;
    }

}

最后,为了处理传入的 JMS 消息,请使用以下内容:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

@Component
public class JMSReceiver implements SessionAwareMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(JMSReceiver.class);

    @Override
    public void onMessage(Message message, Session session) throws JMSException {
        // We know/assume the Queue Payload type was set to 'TextMessage'
        TextMessage txtMessage = (TextMessage) message;
        logger.info("JMS Text Message received: " + txtMessage.getText());

        // ... further implementation
    }

}

0
投票

问题是 AQ 代码需要一个 OracleConnection,但在池化时连接被包装,因此失败


0
投票
 @Bean
    public AQjmsConnectionFactory connectionFactory(@Autowired DataSource dataSource) throws Exception{
        AQjmsConnectionFactory connectionFactory=new AQjmsConnectionFactory();
        connectionFactory.setDatasource(dataSource);
        return connectionFactory;
    }
    @Bean
    public JmsTemplate jmsTemplate(@Autowired AQjmsConnectionFactory connectionFactory) throws Exception{
        JmsTemplate jmsTemplate=new JmsTemplate();
        jmsTemplate.setConnectionFactory(connectionFactory);
        return jmsTemplate;
    }

POM File:

<dependency>
            <groupId>com.oracle.database.jdbc</groupId>
            <artifactId>ojdbc8</artifactId>
        </dependency>
<dependency>
            <groupId>com.oracle.database.messaging</groupId>
            <artifactId>aqapi-jakarta</artifactId>
            <version>23.2.1.0</version>
</dependency>
<dependency>
    <groupId>jakarta.jms</groupId>
    <artifactId>jakarta.jms-api</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/jakarta.transaction/jakarta.transaction-api -->
<dependency>
    <groupId>jakarta.transaction</groupId>
    <artifactId>jakarta.transaction-api</artifactId>
</dependency>

//使用 Spring JMS 模板向 Oracle AQ JMS 队列发送消息的代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Session;

@Service
public class JmsUtil {
    @Autowired
    JmsTemplate jmsTemplate;
    //Send JMS message to Oracle AQ JMS queue
    public void sendMessageToJmsQueue(YourPojoClass yourPojoClass) {
        try {
            ObjectMapper mapper=new ObjectMapper();
            String pojoAsJson=mapper.writeValueAsString(yourPojoClass);
            jmsTemplate.send("YourQueueName", new MessageCreator() {
                public Message createMessage(Session session) throws JMSException{
                    return session.createTextMessage(pojoAsJson);
                }
            });
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.