- 论坛徽章:
- 0
|
orcale queue
为应用系统构建消息通讯之2-----使用oracle高级队列传递消息1
下面给出oracle高级队列传递消息的一些说明,注意没有给出oracle高级队列传的消息传递,以及基于主题消息的通知/监听的相关东西,等我有空再补上。
--=========================================================
1.高级队列操作(AQ)
参考资料
Oracle9i Application Developer's Guide - Advanced Queuing
Release 1 (9.0.1)
Oracle9i PL/SQL 从入门到精通
中国水利水电出版社 2002年5月 ISBN 7-5084-1055-6
1.1.创建对列消息的对象类型(Type)
定义一个对象类型。这个对象类型用于充当队列消息中的消息体。
create or replace type WDZAQMSG as object
-- Author : wdz
-- Created : 2003-11-19 13:49:03
-- Purpose : 消息队列类型
-- Attributes
id int,
name varchar2(200),
info varchar2(2000)
)
1.2.创建队列表
参考dbms_aqadm.create_queue_table
1.存储参数storage_clause 可以是 MAXTRANS,LOB等
2.sort_list 可以是PRIORITY,enq_time这2个参数或者其中一个。
具体可以察看对应的 队列表的数据结构。例如下面的例子
--创建队列表,'WDZAQTABLE' 队列表名,'WDZAQMSG'队列对象名(充当消息体的对象名称),以下同
dbms_aqadm.create_queue_table('WDZAQTABLE','WDZAQMSG');
--创建具有排序功能的队列表
dbms_aqadm.create_queue_table('WDZSQRTAQTABLE','WDZAQMSG',sort_list => 'PRIORITY,enq_time');
3。消息分组
参数 message_grouping 可以是 NONE,或 TRANSACTIONAL
-- message grouping
dbms_aqadm.TRANSACTIONAL CONSTANT BINARY_INTEGER := 1;
dbms_aqadm.NONE CONSTANT BINARY_INTEGER := 0;
后者表示一个与事务相关的消息分成1组,在提取消息的时候可以当成一组相关消息来提取。
例如:
--创建带带分组功能的消息队列表
dbms_aqadm.create_queue_table('WDZGROUPAQTABLE','WDZAQMSG',
sort_list => 'PRIORITY,enq_time',
message_grouping =>dbms_aqadm.TRANSACTIONAL/*dbms_aqadm.NONE*/ );
4.multiple_consumers
表示消息接受者是否为多个用户。默认是只有1个接收者。如果要多个用户可以接受消息,需要设置=true
--创建多个接收者的消息队列表
dbms_aqadm.create_queue_table('WDZMUTIAQTABLE','WDZAQMSG',sort_list => 'PRIORITY,enq_time',
message_grouping =>dbms_aqadm.TRANSACTIONAL/*dbms_aqadm.NONE*/
,multiple_consumers => true);
1.3创建队列
参考 dbms_aqadm.create_queue
--创建队列, 'WDZQUEUE' 为队列的名称,'WDZAQTABLE' 队列表名
dbms_aqadm.create_queue('WDZQUEUE','WDZAQTABLE',queue_type => dbms_aqadm.NORMAL_QUEUE);
参数queue_type 表示队列是否为正常队列还是异常队列,
参数值为dbms_aqadm.NORMAL_QUEUE 或者dbms_aqadm.EXCEPTION_QUEUE
--创建多用户队列, 'WDZQUEUE' 为队列的名称,'WDZAQTABLE' 队列表名
dbms_aqadm.create_queue('WDZMUTIQUEUE','WDZMUTIAQTABLE',
queue_type => dbms_aqadm.NORMAL_QUEUE);
--创建分组队列, 'WDZGROUPQUEUE' 为队列的名称,'WDZAQTABLE' 队列表名
dbms_aqadm.create_queue('WDZGROUPQUEUE', 'WDZAQTABLE',
queue_type => dbms_aqadm.NORMAL_QUEUE);
1.4创建非持久队列
非持久队列 顾名思义就是没有永久保存到数据的队列,队列只存在于系统的内存中。
参考 dbms_aqadm.create_np_queue
--创建队列, 'WDZQUEUE2' 为队列的名称,
dbms_aqadm. create_np_queue ('WDZQUEUE2', multiple_consumers=>false);
参数 multiple_consumers表示接收者是否为多用户接收
1.5启动一个队列
参考dbms_aqadm.start_queue
参数enqueue表示是否可以入队操作
参数dequeue表示是否可以入队操作
-- 启动一个队列,'WDZQUEUE' 为队列的名称- dbms_aqadm.start_queue('WDZQUEUE',enqueue=>true, dequeue=> true);
- dbms_aqadm.start_queue('WDZMUTIQUEUE',enqueue=>true, dequeue=> true);
- dbms_aqadm.start_queue('WDZGROUPQUEUE',enqueue=>true, dequeue=> true);
复制代码 1.6停止队列
参数的意思参考启动一个队列- dbms_aqadm.stop_queue('WDZQUEUE',enqueue=>false, dequeue=>true);
复制代码 1.7普通的入队/出队操作
1.7.1.一个普通的消息发送/接收例子
1.消息发送- declare
- op dbms_aq.enqueue_options_t;
- msgop dbms_aq.message_properties_t;
- v_msgBody WDZAQMSG;
- v_msgid raw(16);
- begin
- --入队操作
- -- ‘WDZQUEUE’就是消息发送到的目的队列
- -- 参数 enqueue_options 队列操作的可选择参数
- --参数 message_properties 队列操作的消息的属性设置(消息头)
- --参数 payload队列操作,发送的消息体
- v_msgBody :=WDZAQMSG(1,'wdz','test send aq the first info ');
- msgop.priority := 1;--消息优先级别
- dbms_aq.enqueue('WDZQUEUE',
- enqueue_options => op,
- message_properties => msgop,
- payload => v_msgBody,
- msgid => v_msgid);
- --提交消息
- commit;
- end;
复制代码 2.消息接收- declare
- dopt dbms_aq.dequeue_options_t;
- mprop dbms_aq.message_properties_t;
- v_msgBody WDZAQMSG;
- v_msgid raw(16);
- begin
- dbms_aq.dequeue('WDZQUEUE',dequeue_options => dopt,
- message_properties => mprop ,
- payload => v_msgBody,msgid => v_msgid);
- dbms_output.put_line('ok recieve info--msgbody id='||
- to_char(v_msgBody.id)||
- ',info name='||v_msgBody.name||
- ',info ='||v_msgBody.info);
- end;
复制代码 1.7.2带有优先级别的消息发送接收例子
1.消息发送- declare
- op dbms_aq.enqueue_options_t;
- msgop dbms_aq.message_properties_t;
- v_msgBody WDZAQMSG;
- v_msgid raw(16);
- begin
- --入队操作
- -- ‘WDZQUEUE’就是消息发送到的目的队列
- -- 参数 enqueue_options 队列操作的可选择参数
- --参数 message_properties 队列操作的消息的属性设置(消息头)
- --参数 payload队列操作,发送的消息体
- msgop.priority := 20;
- v_msgBody :=WDZAQMSG(1,'wdz','test send aq info priority =20');
- dbms_aq.enqueue('WDZQUEUE',enqueue_options => op,message_properties => msgop,payload => v_msgBody,msgid => v_msgid);
- msgop.priority :=5;
- v_msgBody :=WDZAQMSG(1,'wdz','test send aq info priority =5');
- dbms_aq.enqueue('WDZQUEUE',enqueue_options => op,message_properties => msgop,payload => v_msgBody,msgid => v_msgid);
- --提交消息
- commit;
- end;
复制代码 2.消息接收- declare
- dopt dbms_aq.dequeue_options_t;
- mprop dbms_aq.message_properties_t;
- v_msgBody WDZAQMSG;
- v_msgid raw(16);
- begin
- dbms_aq.dequeue('WDZQUEUE',dequeue_options => dopt,
- message_properties => mprop ,
- payload => v_msgBody,msgid => v_msgid);
- dbms_output.put_line(
- 'ok recieve info--msgbody id='||
- to_char(v_msgBody.id)||
- ',info name='||v_msgBody.name||
- ',info ='||v_msgBody.info);
- commit;
- end;
复制代码 1.7.3消息浏览
1.消息发送- declare
- op dbms_aq.enqueue_options_t;
- msgop dbms_aq.message_properties_t;
- v_msgBody WDZAQMSG;
- v_msgid raw(16);
- begin
- --入队操作
- -- ‘WDZQUEUE’就是消息发送到的目的队列
- -- 参数 enqueue_options 队列操作的可选择参数
- --参数 message_properties 队列操作的消息的属性设置(消息头)
- --参数 payload队列操作,发送的消息体
- msgop.priority := 20;
- v_msgBody :=WDZAQMSG(99,'wdz','test send aq info priority =20');
- dbms_aq.enqueue('WDZQUEUE',enqueue_options => op,message_properties => msgop,payload => v_msgBody,msgid => v_msgid);
- msgop.priority :=5;
- v_msgBody :=WDZAQMSG(88,'wdz','test send aq info priority =5');
- dbms_aq.enqueue('WDZQUEUE',enqueue_options => op,message_properties => msgop,payload => v_msgBody,msgid => v_msgid);
- --提交消息
- commit;
- end;
复制代码 2.消息接收
-- 目的,先浏览所有消息,找到指定消息后把这个消息删除- declare
- dopt dbms_aq.dequeue_options_t;
- mprop dbms_aq.message_properties_t;
- v_msgBody WDZAQMSG;
- v_msgid raw(16);
- v_havefound boolean ;
- begin
- v_havefound :=false;
- dopt.dequeue_mode := dbms_aq.browse;
- loop
- exit when v_havefound;
- dbms_aq.dequeue('WDZQUEUE',
- dequeue_options => dopt,
- message_properties => mprop,
- payload => v_msgBody,
- msgid => v_msgid);
- dbms_output.put_line('msg priority=' || to_char(mprop.priority) ||
- ',ok recieve info--msgbody id=' ||
- to_char(v_msgBody.id) || ',info name=' ||
- v_msgBody.name || ',info =' || v_msgBody.info);
- if (v_msgBody.id = 88) then
- v_havefound := true;
- end if;
- end loop;
- dopt.dequeue_mode := dbms_aq.remove;
- dopt.msgid := v_msgid;
- dbms_aq.dequeue('WDZQUEUE',
- dequeue_options => dopt,
- message_properties => mprop,
- payload => v_msgBody,
- msgid => v_msgid);
- dbms_output.put_line('删除消息' ||
- ',ok recieve info--msgbody id=' ||
- to_char(v_msgBody.id) || ',info name=' ||
- v_msgBody.name || ',info =' || v_msgBody.info);
- commit;
- end;
复制代码- 1.8消息选项参数dbms_aq.dequeue_options_t
- type dequeue_options_t is record (
- ---consumer_name 消息接收方名称
- consumer_name varchar2(30) default null,
- ---dequeue_mode dequeue_mode =BROWSE 读取消息不加锁,相当于只读操作
- --dequeue_mode =LOCKED读取时候加锁,别的不能进行操作,直到事务提交或者回滚
- --dequeue_mode =REMOVE 默认值,读取的时候可以修改或者删除,消息最后删除否,
- --取决于消息属性列表 retention 的设置
- --dequeue_mode =REMOVE_NODATA ,标记消息是否被修改或者删除,消息最后删除否,
- --取决于消息属性列表 retention 的设置
- dequeue_mode binary_integer default REMOVE,
- ---navigation = NEXT_MESSAGE 这为默认设置, 根据当前位置获取下一个消息
- ---navigation = NEXT_TRANSACTION 跳过当前事务组,获取下一个事务组的第1条消息
- ---navigation = FIRST_MESSAGE 获取当前队列的符合条件的第八个五年计划条消息,
- navigation binary_integer default NEXT_MESSAGE,
- ---visibility指定消息入队时的事务行为,visibility=ON_COMMIT 说明生成的消息是
- --当前事务的一部分
- --visibility=IMMEDIATE 消息立即进入队列,如果 操作的非持久队列,该参数必须等于这个值
- visibility binary_integer default ON_COMMIT ,
- --wait制定没有消息匹配的确时候等待时间
- --wait=FOREVER 永远等待
- --wait=NO_WAIT 不等待
- --wait=number 等待指定的时间,单位为秒
- wait binary_integer default FOREVER,
- --msgid 消息标识符号
- msgid raw(16) default null,
- --correlation 提取消息的标识符号,他在消息进入队列前应用程序设置,例如:可以在用于通讯中握手时身份识别
- correlation varchar2(128) default null,
- --消息的提取条件
- deq_condiction varchar2(4000) null,
- ---transformation 指明消息进入队列前使用的转换函数
- transformation varchar2(60) default null
- );
复制代码 1.9消息属性参数
type message_properties_t is record (
---消息优先级别 ,可以小于0 ,数字越小优先程度越高
priority binary_integer default 1,
---消息在队列中的延迟时间,过了这个时间消息才可以出队
--delay=NO_DELAY 消息不延迟,否者为延迟时间
delay binary_integer default NO_DELAY,
--消息过期时间,expiration=NEVER,消息永远不过期
expiration binary_integer default NEVER,
---消息发送方设置的标示符号
correlation varchar2(12 default null,
attempts binary_integer,
--消息接收者列表
recipient_list aq$_recipient_list_t,
-- 异常队列名称,当消息不能处理,或者处理出错,消息被转移到这个队列,类似于jsp的错误页面
exception_queue varchar2(51) default null,
--消息进入队列的时间,由系统维护用户不能修改
enqueue_time date,
--发送方标识
sender_id aq$_agent default null,
--AQ消息传播用来说明来源
original_msgid raw(16) default null);
;
1.10消息接收者/订阅者
--消息接收者- type sys.aq$_agent is object (
- name varchar2(30),
- address varchar2(1024),
- protocol number )
复制代码 --消息接收者列表- type aq$_recipient_list_t is table of sys.aq$_agent index by binary_integer
复制代码 --消息订阅者列表类型- type aq$_subscriber_list_t is table of sys.aq$_agent
- --注册消息类型
- type sys.aq$_reg_info is object (
- name varchar2(128),
- namespace number,
- callback varchar2(4000),
- context raw(2000)
- );
复制代码 ---定义一个队列的注册列表- type aq$_reg_info_list as varray (1024) of sys.aq$_reg_info
复制代码 1.11多用户队列的使用(订阅/发布模型)
1.多用户队列的使用,建立订阅者列表
---多用户队列的使用,建立订阅者列表- declare
- subscriber sys.aq$_agent;
- begin
- subscriber := sys.aq$_agent('WDZ',null,null);
- dbms_aqadm.add_subscriber('WDZMUTIQUEUE',subscriber);
- subscriber := sys.aq$_agent('apple',null,null);
- dbms_aqadm.add_subscriber('WDZMUTIQUEUE',subscriber);
- end;
复制代码 2.消息发送
--消息的发送(发布/订阅模型)- declare
- enqueue_options dbms_aq.enqueue_options_t;
- message_propertites dbms_aq.message_properties_t;
- recipients dbms_aq.aq$_recipient_list_t;
- message_handle raw(16);
- message WDZAQMSG;
- begin
- recipients(1) := sys.aq$_agent ('wdz',null,null);
- recipients(2) := sys.aq$_agent ('apple',null,null);
- message_propertites.recipient_list := recipients;
- message := WDZAQMSG(10,'wdz','这是发布订阅模型的第一个消息');
- dbms_aq.enqueue('WDZMUTIQUEUE',enqueue_options,message_propertites,message,message_handle);
- message := WDZAQMSG(10,'wdz','这是发布订阅模型的第2个消息');
- dbms_aq.enqueue('WDZMUTIQUEUE',enqueue_options,message_propertites,message,message_handle);
- commit;
- end;
复制代码 3.消息的接收
-- 发布/订阅模型 ,消息的接收- declare
- dequeue_options dbms_aq.dequeue_options_t;
- message_propertites dbms_aq.message_properties_t;
- recipients dbms_aq.aq$_recipient_list_t;
- message_handle raw(16);
- message WDZAQMSG;
- begin
- dequeue_options.wait := dbms_aq.NO_WAIT;
- --用户 wdz 开始接收消息
- begin
- dequeue_options.consumer_name := 'wdz';
- dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
- loop
- dbms_aq.dequeue('WDZMUTIQUEUE',dequeue_options,message_propertites,
- message,message_handle);
- dbms_output.put_line('ok recieve info--msgbody id=' ||
- to_char(message.id) || ',info name=' ||
- message.name || ',info =' || message.info);
- dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
- end loop;
- exception
- when others then
- dbms_output.put_line('ok user=wdz的消息接收完毕!');
- end;
- commit;
复制代码 --用户 apple 开始接收消息- [code]begin
- dequeue_options.consumer_name := 'apple';
- dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
- loop
- dbms_aq.dequeue('WDZMUTIQUEUE',dequeue_options,message_propertites,
- message,message_handle);
- dbms_output.put_line('ok recieve info--msgbody id=' ||
- to_char(message.id) || ',info name=' ||
- message.name || ',info =' || message.info);
- dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
- end loop;
- exception
- when others then
- dbms_output.put_line('ok user=apple的消息接收完毕!');
- end;
- commit;
- end;
复制代码 [/code]
1.12消息分组的使用
1.消息分组,消息的发送
--消息分组,消息的发送- declare
- op dbms_aq.enqueue_options_t;
- msgop dbms_aq.message_properties_t;
- v_msgBody WDZAQMSG;
- v_msgid raw(16);
- v_TransNo int;
- v_MsgNo int;
- begin
- ---把消息发送分成4组发送
- for v_TransNo in 1 .. 4 loop
- for v_MsgNo in 1 .. 3 loop
- v_msgBody := WDZAQMSG((v_TransNo-1)*3+v_MsgNo, 'wdz',
- 'test send aq info v_MsgNo ='||
- to_char(v_MsgNo)||',v_TransNo='||to_char(v_TransNo));
- dbms_aq.enqueue('WDZGROUPQUEUE',
- enqueue_options => op,
- message_properties => msgop,
- payload => v_msgBody,
- msgid => v_msgid);
- end loop;
- --提交消息
- commit;
- end loop;
- end;
复制代码 2.消息的接收
---消息分组,消息的接收- declare
- dequeue_options dbms_aq.dequeue_options_t;
- mprop dbms_aq.message_properties_t;
- v_msgBody WDZAQMSG;
- v_msgid raw(16);
- i int := 0;
- j int :=0;
- begin
- dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
- dequeue_options.wait := dbms_aq.NO_WAIT;
- loop
- begin
- dbms_aq.dequeue('WDZGROUPQUEUE',
- dequeue_options => dequeue_options,
- message_properties => mprop,
- payload => v_msgBody,
- msgid => v_msgid);
- dbms_output.put_line('ok recieve info--msgbody id=' ||
- to_char(v_msgBody.id) || ',info name=' ||
- v_msgBody.name || ',info =' || v_msgBody.info);
- dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
- i := 0;
- exception
- when others then
- begin
- if (i >= 1) then
- dbms_output.put_line('所有消息接收完成');
- exit;
- end if;
-
- -- dbms_output.put_line('本消息组,消息接收完成');
- -- i := 1;
- -- dequeue_options.navigation := dbms_aq.NEXT_TRANSACTION;
- j := j+1;
- commit;
- end;
- end;
- end loop;
- dbms_output.put_line('j='||to_char(j-1));
- end;
复制代码 1.7消息选项参数dbms_aq.dequeue_options_t
由于包dbms_aq是加密的,所以看不到dbms_aq. dequeue_options_t,现给出参数说明.
type dequeue_options_t is record {
---visibility指定消息入队时的事务行为,visibility=ON_COMMIT 说明生成的消息是
--当前事务的一部分
--visibility=IMMEDIATE 消息立即进入队列,如果 操作的非持久队列,该参数必须等于这个值
visibility binary_integer default ON_COMMIT ,
---relative_msgid 消息的标识和 参数 seqquence_deviation 配合使用,
--而且在 seqquence_deviation = BEFORE 才有效
relative_msgid raw(16) default null,
--seqquence_deviation 说明当消息进入队列的放置方式
-- seqquence_deviation=null 放置到队尾
--seqquence_deviation=TOP 放置到队头
--seqquence_deviation=TOP 放置到指定消息 relative_msgid 的前面
seqquence_deviation binary_integer default null,
---transformation 指明消息进入队列前使用的转换函数
transformation varchar2(60) default null
}; |
|