环境。Oracle 12c
我目前有一个Oracle高级队列系统,设置如下。
NAME QUEUE_TABLE QID QUEUE_TYPE MAX_RETRIES RETRY_DELAY RETENTION
--------------- --------------- ------- --------------- ----------- ----------- ---------
MY_WORK_Q MY_WORK_QT 2518333 NORMAL_QUEUE 100 0 0
MY_WORK_QT_E MY_WORK_QT 2518332 EXCEPTION_QUEUE 0 0 0
我还注册了一个调用PLSQL包过程的回调。
由于某些原因,我似乎出现了消息丢失或消息没有被去队列的情况。
基于此,请大家协助解决以下问题。
1) Have I setup my actual queue - MY_WORK_Q with MAX_RETRIES and other info correctly?
2) Is there a way to check messages that have been enqueued?
3) Is there a way to check messages that have been de-queued?
4) Is there a means of checking the EXCEPTION_QUEUE/Exception table to see if lost messages have reached there?
我只是不明白为什么在我的队列系统中会丢失消息 我可以检查一下是什么原因导致了这个问题。
我也把MAX_RETRIES增加到100,但似乎还是有问题。
更新:
我好像得到了 error ORA-25263: no message in queue
.
我不确定这是否与dbms_aq.enqueue或dbms_aq.dequeue调用的时间问题有关,但在dbms_aq.dequeue中我设置了这个。
l_dequeue_options.wait := dbms_aq.no_wait;
我是否需要10秒的等待而不是no_wait? 不知道这是否可能,也不知道等待是否需要在enqueue或dequeue步骤中?
这是一个简单的例子,它的工作原理。 也许你可以用它来确定你的问题?
-- DROP TYPE some_type_t FORCE;
CREATE OR REPLACE TYPE some_type_t AS OBJECT (
a NUMBER,
b VARCHAR(100),
c DATE
);
CREATE OR REPLACE PROCEDURE some_type_callback(CONTEXT IN RAW,
reginfo IN sys.aq$_reg_info,
descr IN sys.aq$_descriptor,
payload IN RAW,
payloadl IN NUMBER) AS
-- Local variables
v_dequeue_options dbms_aq.dequeue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle RAW(26);
v_some_type some_type_t;
BEGIN
-- Set the dequeue options from the descriptor
v_dequeue_options.consumer_name := descr.consumer_name;
v_dequeue_options.msgid := descr.msg_id;
-- Dequeue the message
dbms_aq.dequeue(queue_name => descr.queue_name,
dequeue_options => v_dequeue_options,
message_properties => v_message_properties,
payload => v_some_type,
msgid => v_message_handle);
END some_type_callback;
/
SELECT *
FROM user_errors e
WHERE e.name = 'SOME_TYPE_CALLBACK';
BEGIN
-- dbms_aqadm.drop_queue_table(queue_table => 'some_type_qt',
-- force => TRUE);
dbms_aqadm.create_queue_table(queue_table => 'some_type_qt',
queue_payload_type => 'some_type_t',
multiple_consumers => TRUE);
dbms_aqadm.create_queue(queue_name => 'some_type_q',
queue_table => 'some_type_qt',
retention_time => 86400); -- 1 day
dbms_aqadm.start_queue(queue_name => 'some_type_q');
dbms_aqadm.add_subscriber(queue_name => 'some_type_q',
subscriber => sys.aq$_agent(NAME => 'some_type_qs',
address => NULL,
protocol => NULL));
dbms_aq.register(sys.aq$_reg_info_list(sys.aq$_reg_info('some_type_q:some_type_qs',
dbms_aq.namespace_aq,
'plsql://some_type_callback',
hextoraw('FF'))),
1);
END;
/
SELECT *
FROM aq$some_type_qt;
-- nothing
DECLARE
v_some_type some_type_t;
eopt dbms_aq.enqueue_options_t;
mprop dbms_aq.message_properties_t;
enq_msgid RAW(16);
BEGIN
v_some_type := some_type_t(a => 42,
b => 'forty-two',
c => to_date('1/1/2942',
'mm/dd/yyyy'));
dbms_aq.enqueue(queue_name => 'some_type_q',
enqueue_options => eopt,
message_properties => mprop,
payload => v_some_type,
msgid => enq_msgid);
END;
/
SELECT *
FROM aq$some_type_qt;
-- msg_state = READY => PROCESSED
有一件事肯定会对你有所帮助,那就是在你的队列表上设置 retention_time。 然后您可以使用 aq$ 引用来查看队列表的消息。 msg_state列会显示READY代表一个新鲜的消息,PROCESSED代表一个被消耗的消息。 还有一些其他的列会有帮助:例如retry_count。
如果你得到的是ORA-25263,似乎不是处理你在回调中被激起的一条消息,而是试图读取不同的消息,并与另一个开始消耗队列的作业发生冲突。 在调用dequeue之前,我在回调中加入了两行文字来解决这个问题。
如果你需要在消息到达时触发,然后保留消息顺序,你需要在你的回调中增加一些锁定和额外的复杂性。 你可以看看Metalink 225810.1中的例子来了解如何做到这一点。