免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 1378 | 回复: 2
打印 上一主题 下一主题

[Veritas NBU] orcale queue [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2012-03-16 16:38 |只看该作者 |倒序浏览
orcale queue










为应用系统构建消息通讯之2-----使用oracle高级队列传递消息1

下面给出oracle高级队列传递消息的一些说明,注意没有给出oracle高级队列传的消息传递,以及基于主题消息的通知/监听的相关东西,等我有空再补上。
--=========================================================
1.高级队列操作(AQ)
参考资料
  Oracle9i Application Developer's Guide - Advanced Queuing
Release 1 (9.0.1)
  Oracle9i PL/SQL 从入门到精通
中国水利水电出版社 2002年5月 ISBN 7-5084-1055-6
1.1.创建对列消息的对象类型(Type)
定义一个对象类型。这个对象类型用于充当队列消息中的消息体。

create or replace type WDZAQMSG as object
-- Author : wdz
-- Created : 2003-11-19 13:49:03
-- Purpose : 消息队列类型
-- Attributes
id int,
name varchar2(200),
info varchar2(2000)
)
1.2.创建队列表
参考dbms_aqadm.create_queue_table
1.存储参数storage_clause 可以是 MAXTRANS,LOB等
2.sort_list 可以是PRIORITY,enq_time这2个参数或者其中一个。
具体可以察看对应的 队列表的数据结构。例如下面的例子
--创建队列表,'WDZAQTABLE' 队列表名,'WDZAQMSG'队列对象名(充当消息体的对象名称),以下同
dbms_aqadm.create_queue_table('WDZAQTABLE','WDZAQMSG');
--创建具有排序功能的队列表
dbms_aqadm.create_queue_table('WDZSQRTAQTABLE','WDZAQMSG',sort_list => 'PRIORITY,enq_time');
3。消息分组
参数 message_grouping 可以是 NONE,或 TRANSACTIONAL
-- message grouping
dbms_aqadm.TRANSACTIONAL       CONSTANT BINARY_INTEGER := 1;
dbms_aqadm.NONE             CONSTANT BINARY_INTEGER := 0;
后者表示一个与事务相关的消息分成1组,在提取消息的时候可以当成一组相关消息来提取。
例如:
--创建带带分组功能的消息队列表
dbms_aqadm.create_queue_table('WDZGROUPAQTABLE','WDZAQMSG',
sort_list => 'PRIORITY,enq_time',
  message_grouping =>dbms_aqadm.TRANSACTIONAL/*dbms_aqadm.NONE*/ );
4.multiple_consumers
表示消息接受者是否为多个用户。默认是只有1个接收者。如果要多个用户可以接受消息,需要设置=true
--创建多个接收者的消息队列表
dbms_aqadm.create_queue_table('WDZMUTIAQTABLE','WDZAQMSG',sort_list => 'PRIORITY,enq_time',
  message_grouping =>dbms_aqadm.TRANSACTIONAL/*dbms_aqadm.NONE*/
  ,multiple_consumers => true);

1.3创建队列
参考 dbms_aqadm.create_queue
--创建队列, 'WDZQUEUE' 为队列的名称,'WDZAQTABLE' 队列表名
dbms_aqadm.create_queue('WDZQUEUE','WDZAQTABLE',queue_type => dbms_aqadm.NORMAL_QUEUE);
参数queue_type 表示队列是否为正常队列还是异常队列,
参数值为dbms_aqadm.NORMAL_QUEUE 或者dbms_aqadm.EXCEPTION_QUEUE

--创建多用户队列, 'WDZQUEUE' 为队列的名称,'WDZAQTABLE' 队列表名
dbms_aqadm.create_queue('WDZMUTIQUEUE','WDZMUTIAQTABLE',
queue_type => dbms_aqadm.NORMAL_QUEUE);
--创建分组队列, 'WDZGROUPQUEUE' 为队列的名称,'WDZAQTABLE' 队列表名
dbms_aqadm.create_queue('WDZGROUPQUEUE', 'WDZAQTABLE',
    queue_type => dbms_aqadm.NORMAL_QUEUE);

1.4创建非持久队列
非持久队列 顾名思义就是没有永久保存到数据的队列,队列只存在于系统的内存中。
参考 dbms_aqadm.create_np_queue
--创建队列, 'WDZQUEUE2' 为队列的名称,
dbms_aqadm. create_np_queue ('WDZQUEUE2', multiple_consumers=>false);
参数 multiple_consumers表示接收者是否为多用户接收
1.5启动一个队列
参考dbms_aqadm.start_queue
参数enqueue表示是否可以入队操作
参数dequeue表示是否可以入队操作

-- 启动一个队列,'WDZQUEUE' 为队列的名称
  1. dbms_aqadm.start_queue('WDZQUEUE',enqueue=>true, dequeue=> true);
  2. dbms_aqadm.start_queue('WDZMUTIQUEUE',enqueue=>true, dequeue=> true);
  3. dbms_aqadm.start_queue('WDZGROUPQUEUE',enqueue=>true, dequeue=> true);
复制代码
1.6停止队列
参数的意思参考启动一个队列
  1. dbms_aqadm.stop_queue('WDZQUEUE',enqueue=>false, dequeue=>true);
复制代码
1.7普通的入队/出队操作
1.7.1.一个普通的消息发送/接收例子
1.消息发送
  1. declare
  2. op dbms_aq.enqueue_options_t;
  3. msgop dbms_aq.message_properties_t;
  4. v_msgBody WDZAQMSG;
  5. v_msgid   raw(16);
  6. begin
  7. --入队操作
  8.   -- ‘WDZQUEUE’就是消息发送到的目的队列
  9. -- 参数 enqueue_options 队列操作的可选择参数
  10. --参数 message_properties 队列操作的消息的属性设置(消息头)
  11. --参数 payload队列操作,发送的消息体
  12. v_msgBody :=WDZAQMSG(1,'wdz','test send aq the first info ');
  13. msgop.priority := 1;--消息优先级别
  14. dbms_aq.enqueue('WDZQUEUE',
  15. enqueue_options => op,
  16. message_properties => msgop,
  17. payload => v_msgBody,
  18. msgid => v_msgid);
  19. --提交消息
  20. commit;
  21. end;
复制代码
2.消息接收
  1. declare
  2. dopt               dbms_aq.dequeue_options_t;
  3. mprop               dbms_aq.message_properties_t;
  4. v_msgBody             WDZAQMSG;
  5. v_msgid   raw(16);
  6. begin
  7. dbms_aq.dequeue('WDZQUEUE',dequeue_options => dopt,
  8.         message_properties => mprop ,
  9.         payload => v_msgBody,msgid => v_msgid);
  10. dbms_output.put_line('ok recieve info--msgbody id='||
  11.     to_char(v_msgBody.id)||
  12.     ',info name='||v_msgBody.name||
  13.     ',info ='||v_msgBody.info);
  14. end;
复制代码
1.7.2带有优先级别的消息发送接收例子
1.消息发送
  1. declare
  2. op dbms_aq.enqueue_options_t;
  3. msgop dbms_aq.message_properties_t;
  4. v_msgBody WDZAQMSG;
  5. v_msgid   raw(16);
  6. begin
  7. --入队操作
  8. -- ‘WDZQUEUE’就是消息发送到的目的队列
  9. -- 参数 enqueue_options 队列操作的可选择参数
  10. --参数 message_properties 队列操作的消息的属性设置(消息头)
  11. --参数 payload队列操作,发送的消息体
  12. msgop.priority := 20;
  13. v_msgBody :=WDZAQMSG(1,'wdz','test send aq info priority =20');
  14. dbms_aq.enqueue('WDZQUEUE',enqueue_options => op,message_properties => msgop,payload => v_msgBody,msgid => v_msgid);
  15. msgop.priority :=5;
  16. v_msgBody :=WDZAQMSG(1,'wdz','test send aq info priority =5');
  17. dbms_aq.enqueue('WDZQUEUE',enqueue_options => op,message_properties => msgop,payload => v_msgBody,msgid => v_msgid);
  18. --提交消息
  19. commit;
  20. end;
复制代码
2.消息接收
  1. declare
  2. dopt               dbms_aq.dequeue_options_t;
  3. mprop               dbms_aq.message_properties_t;
  4. v_msgBody             WDZAQMSG;
  5. v_msgid   raw(16);
  6. begin
  7. dbms_aq.dequeue('WDZQUEUE',dequeue_options => dopt,
  8.         message_properties => mprop ,
  9.         payload => v_msgBody,msgid => v_msgid);
  10. dbms_output.put_line(
  11.     'ok recieve info--msgbody id='||
  12.     to_char(v_msgBody.id)||
  13.     ',info name='||v_msgBody.name||
  14.     ',info ='||v_msgBody.info);
  15.     commit;

  16. end;
复制代码
1.7.3消息浏览
1.消息发送
  1. declare
  2. op dbms_aq.enqueue_options_t;
  3. msgop dbms_aq.message_properties_t;
  4. v_msgBody WDZAQMSG;
  5. v_msgid   raw(16);
  6. begin
  7. --入队操作
  8. -- ‘WDZQUEUE’就是消息发送到的目的队列
  9. -- 参数 enqueue_options 队列操作的可选择参数
  10. --参数 message_properties 队列操作的消息的属性设置(消息头)
  11. --参数 payload队列操作,发送的消息体
  12. msgop.priority := 20;
  13. v_msgBody :=WDZAQMSG(99,'wdz','test send aq info priority =20');
  14. dbms_aq.enqueue('WDZQUEUE',enqueue_options => op,message_properties => msgop,payload => v_msgBody,msgid => v_msgid);
  15. msgop.priority :=5;
  16. v_msgBody :=WDZAQMSG(88,'wdz','test send aq info priority =5');
  17. dbms_aq.enqueue('WDZQUEUE',enqueue_options => op,message_properties => msgop,payload => v_msgBody,msgid => v_msgid);
  18. --提交消息
  19. commit;
  20. end;
复制代码
2.消息接收

-- 目的,先浏览所有消息,找到指定消息后把这个消息删除
  1. declare
  2. dopt       dbms_aq.dequeue_options_t;
  3. mprop     dbms_aq.message_properties_t;
  4. v_msgBody   WDZAQMSG;
  5. v_msgid     raw(16);
  6. v_havefound boolean ;
  7. begin
  8. v_havefound :=false;
  9. dopt.dequeue_mode := dbms_aq.browse;
  10. loop
  11.   exit when v_havefound;
  12.   dbms_aq.dequeue('WDZQUEUE',
  13.               dequeue_options => dopt,
  14.               message_properties => mprop,
  15.               payload => v_msgBody,
  16.               msgid => v_msgid);
  17.   dbms_output.put_line('msg priority=' || to_char(mprop.priority) ||
  18.                 ',ok recieve info--msgbody id=' ||
  19.                 to_char(v_msgBody.id) || ',info name=' ||
  20.                 v_msgBody.name || ',info =' || v_msgBody.info);
  21.   if (v_msgBody.id = 88) then
  22.     v_havefound := true;
  23.   end if;
  24. end loop;
  25. dopt.dequeue_mode := dbms_aq.remove;
  26. dopt.msgid := v_msgid;
  27.     dbms_aq.dequeue('WDZQUEUE',
  28.               dequeue_options => dopt,
  29.               message_properties => mprop,
  30.               payload => v_msgBody,
  31.               msgid => v_msgid);
  32.   dbms_output.put_line('删除消息' ||
  33.                 ',ok recieve info--msgbody id=' ||
  34.                 to_char(v_msgBody.id) || ',info name=' ||
  35.                 v_msgBody.name || ',info =' || v_msgBody.info);
  36. commit;                 
  37. end;
复制代码
  1. 1.8消息选项参数dbms_aq.dequeue_options_t
  2. type dequeue_options_t is record (
  3.   ---consumer_name 消息接收方名称
  4.   consumer_name varchar2(30) default null,
  5.   ---dequeue_mode dequeue_mode =BROWSE 读取消息不加锁,相当于只读操作
  6.   --dequeue_mode =LOCKED读取时候加锁,别的不能进行操作,直到事务提交或者回滚
  7.   --dequeue_mode =REMOVE 默认值,读取的时候可以修改或者删除,消息最后删除否,
  8.     --取决于消息属性列表 retention 的设置
  9.   --dequeue_mode =REMOVE_NODATA ,标记消息是否被修改或者删除,消息最后删除否,
  10.     --取决于消息属性列表 retention 的设置
  11.   dequeue_mode   binary_integer default REMOVE,
  12.   ---navigation = NEXT_MESSAGE 这为默认设置, 根据当前位置获取下一个消息
  13.   ---navigation = NEXT_TRANSACTION 跳过当前事务组,获取下一个事务组的第1条消息
  14.   ---navigation = FIRST_MESSAGE 获取当前队列的符合条件的第八个五年计划条消息,   
  15.   navigation   binary_integer default NEXT_MESSAGE,   
  16.   ---visibility指定消息入队时的事务行为,visibility=ON_COMMIT 说明生成的消息是
  17.   --当前事务的一部分
  18.   --visibility=IMMEDIATE 消息立即进入队列,如果 操作的非持久队列,该参数必须等于这个值
  19.   visibility binary_integer default ON_COMMIT ,
  20.   --wait制定没有消息匹配的确时候等待时间
  21.   --wait=FOREVER 永远等待
  22.   --wait=NO_WAIT 不等待   
  23.   --wait=number 等待指定的时间,单位为秒      
  24.   wait binary_integer default FOREVER,
  25.   --msgid 消息标识符号
  26.   msgid raw(16) default null,
  27.   --correlation 提取消息的标识符号,他在消息进入队列前应用程序设置,例如:可以在用于通讯中握手时身份识别
  28.   correlation varchar2(128) default null,
  29.   --消息的提取条件
  30.   deq_condiction varchar2(4000) null,
  31.   ---transformation 指明消息进入队列前使用的转换函数
  32.   transformation varchar2(60) default null
  33. );
复制代码
1.9消息属性参数

type message_properties_t is record (
---消息优先级别 ,可以小于0 ,数字越小优先程度越高
  priority binary_integer default 1,
  ---消息在队列中的延迟时间,过了这个时间消息才可以出队
  --delay=NO_DELAY 消息不延迟,否者为延迟时间
  delay   binary_integer default NO_DELAY,
  --消息过期时间,expiration=NEVER,消息永远不过期
  expiration binary_integer default NEVER,
  ---消息发送方设置的标示符号
  correlation varchar2(12 default null,
  attempts binary_integer,
  --消息接收者列表
  recipient_list aq$_recipient_list_t,
  -- 异常队列名称,当消息不能处理,或者处理出错,消息被转移到这个队列,类似于jsp的错误页面
  exception_queue varchar2(51) default null,
  --消息进入队列的时间,由系统维护用户不能修改
  enqueue_time date,
  --发送方标识
  sender_id aq$_agent default null,
  --AQ消息传播用来说明来源
  original_msgid raw(16) default null);   
;
1.10消息接收者/订阅者
--消息接收者
  1. type sys.aq$_agent is object (
  2.   name varchar2(30),
  3.   address varchar2(1024),
  4.   protocol number )
复制代码
--消息接收者列表
  1. type aq$_recipient_list_t is table of sys.aq$_agent index by binary_integer
复制代码
--消息订阅者列表类型
  1. type aq$_subscriber_list_t is table of sys.aq$_agent
  2. --注册消息类型
  3. type sys.aq$_reg_info is object (
  4. name varchar2(128),
  5. namespace number,
  6. callback varchar2(4000),
  7. context raw(2000)
  8. );
复制代码
---定义一个队列的注册列表
  1. type aq$_reg_info_list as varray (1024) of sys.aq$_reg_info
复制代码
1.11多用户队列的使用(订阅/发布模型)
1.多用户队列的使用,建立订阅者列表
---多用户队列的使用,建立订阅者列表
  1. declare
  2. subscriber sys.aq$_agent;
  3. begin
  4. subscriber := sys.aq$_agent('WDZ',null,null);
  5. dbms_aqadm.add_subscriber('WDZMUTIQUEUE',subscriber);
  6. subscriber := sys.aq$_agent('apple',null,null);
  7. dbms_aqadm.add_subscriber('WDZMUTIQUEUE',subscriber);
  8. end;
复制代码
2.消息发送
--消息的发送(发布/订阅模型)
  1. declare
  2. enqueue_options dbms_aq.enqueue_options_t;
  3. message_propertites dbms_aq.message_properties_t;
  4. recipients dbms_aq.aq$_recipient_list_t;
  5. message_handle raw(16);
  6. message WDZAQMSG;
  7. begin
  8. recipients(1) := sys.aq$_agent ('wdz',null,null);
  9. recipients(2) := sys.aq$_agent ('apple',null,null);
  10. message_propertites.recipient_list := recipients;
  11. message := WDZAQMSG(10,'wdz','这是发布订阅模型的第一个消息');
  12. dbms_aq.enqueue('WDZMUTIQUEUE',enqueue_options,message_propertites,message,message_handle);
  13. message := WDZAQMSG(10,'wdz','这是发布订阅模型的第2个消息');
  14. dbms_aq.enqueue('WDZMUTIQUEUE',enqueue_options,message_propertites,message,message_handle);
  15. commit;
  16. end;
复制代码
3.消息的接收
-- 发布/订阅模型 ,消息的接收
  1. declare
  2. dequeue_options dbms_aq.dequeue_options_t;
  3. message_propertites dbms_aq.message_properties_t;
  4. recipients dbms_aq.aq$_recipient_list_t;
  5. message_handle raw(16);
  6. message WDZAQMSG;
  7. begin
  8. dequeue_options.wait := dbms_aq.NO_WAIT;
  9. --用户 wdz 开始接收消息
  10. begin
  11.   dequeue_options.consumer_name := 'wdz';
  12.   dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
  13.   loop
  14.     dbms_aq.dequeue('WDZMUTIQUEUE',dequeue_options,message_propertites,
  15. message,message_handle);
  16.     dbms_output.put_line('ok recieve info--msgbody id=' ||
  17.                 to_char(message.id) || ',info name=' ||
  18.                 message.name || ',info =' || message.info);
  19.     dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;            
  20.   end loop;
  21.   exception
  22.   when others then
  23.     dbms_output.put_line('ok user=wdz的消息接收完毕!');
  24. end;
  25. commit;
复制代码
--用户 apple 开始接收消息
  1. [code]begin
  2.   dequeue_options.consumer_name := 'apple';
  3.   dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
  4.   loop
  5.     dbms_aq.dequeue('WDZMUTIQUEUE',dequeue_options,message_propertites,
  6. message,message_handle);
  7.     dbms_output.put_line('ok recieve info--msgbody id=' ||
  8.                 to_char(message.id) || ',info name=' ||
  9.                 message.name || ',info =' || message.info);
  10.     dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;            
  11.   end loop;
  12.     exception
  13.   when others then
  14.     dbms_output.put_line('ok user=apple的消息接收完毕!');
  15. end;
  16. commit;
  17. end;
复制代码
[/code]

1.12消息分组的使用
1.消息分组,消息的发送
--消息分组,消息的发送
  1. declare
  2. op       dbms_aq.enqueue_options_t;
  3. msgop     dbms_aq.message_properties_t;
  4. v_msgBody WDZAQMSG;
  5. v_msgid   raw(16);
  6. v_TransNo int;
  7. v_MsgNo   int;
  8. begin
  9. ---把消息发送分成4组发送
  10. for v_TransNo in 1 .. 4 loop
  11.   for v_MsgNo in 1 .. 3 loop
  12.     v_msgBody := WDZAQMSG((v_TransNo-1)*3+v_MsgNo, 'wdz',
  13.               'test send aq info v_MsgNo ='||
  14.               to_char(v_MsgNo)||',v_TransNo='||to_char(v_TransNo));
  15.     dbms_aq.enqueue('WDZGROUPQUEUE',
  16.               enqueue_options => op,
  17.               message_properties => msgop,
  18.               payload => v_msgBody,
  19.               msgid => v_msgid);
  20.   end loop;
  21.   --提交消息
  22.   commit;
  23. end loop;
  24. end;
复制代码
2.消息的接收
---消息分组,消息的接收
  1. declare
  2. dequeue_options dbms_aq.dequeue_options_t;
  3. mprop         dbms_aq.message_properties_t;
  4. v_msgBody     WDZAQMSG;
  5. v_msgid       raw(16);
  6. i           int := 0;
  7. j           int :=0;
  8. begin

  9. dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
  10. dequeue_options.wait     := dbms_aq.NO_WAIT;
  11. loop
  12.   begin
  13.     dbms_aq.dequeue('WDZGROUPQUEUE',
  14.               dequeue_options => dequeue_options,
  15.               message_properties => mprop,
  16.               payload => v_msgBody,
  17.               msgid => v_msgid);
  18.     dbms_output.put_line('ok recieve info--msgbody id=' ||
  19.                   to_char(v_msgBody.id) || ',info name=' ||
  20.                   v_msgBody.name || ',info =' || v_msgBody.info);
  21.     dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
  22.     i := 0;
  23.   exception
  24.     when others then
  25.       begin
  26.       if (i >= 1) then
  27.         dbms_output.put_line('所有消息接收完成');
  28.         exit;
  29.       end if;
  30.       
  31.       -- dbms_output.put_line('本消息组,消息接收完成');
  32.       -- i := 1;
  33.       -- dequeue_options.navigation := dbms_aq.NEXT_TRANSACTION;
  34.       j := j+1;
  35.       commit;
  36.       end;
  37.   end;
  38. end loop;
  39.   dbms_output.put_line('j='||to_char(j-1));
  40. end;
复制代码
1.7消息选项参数dbms_aq.dequeue_options_t
由于包dbms_aq是加密的,所以看不到dbms_aq. dequeue_options_t,现给出参数说明.
type dequeue_options_t is record {
  ---visibility指定消息入队时的事务行为,visibility=ON_COMMIT 说明生成的消息是
--当前事务的一部分
  --visibility=IMMEDIATE 消息立即进入队列,如果 操作的非持久队列,该参数必须等于这个值
  visibility binary_integer default ON_COMMIT ,
  ---relative_msgid 消息的标识和 参数 seqquence_deviation 配合使用,
--而且在 seqquence_deviation = BEFORE 才有效
  relative_msgid raw(16) default null,
  --seqquence_deviation 说明当消息进入队列的放置方式
  -- seqquence_deviation=null 放置到队尾
  --seqquence_deviation=TOP 放置到队头
  --seqquence_deviation=TOP 放置到指定消息 relative_msgid 的前面   
  seqquence_deviation binary_integer default null,
  ---transformation 指明消息进入队列前使用的转换函数
  transformation varchar2(60) default null
};

论坛徽章:
0
2 [报告]
发表于 2012-03-16 16:38 |只看该作者
谢谢分享

论坛徽章:
0
3 [报告]
发表于 2012-03-16 16:42 |只看该作者
提示: 作者被禁止或删除 内容自动屏蔽
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP