免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 1055 | 回复: 0
打印 上一主题 下一主题

结合Spring2.0和ActiveMQ进行异步消息调用 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2006-11-21 10:19 |只看该作者 |倒序浏览
在Spring 2.0之前,Spring的JMS的作用局限于产生消息。这个功能(封装在 JmsTemplate 类中)当然是很好的, 但是,它没有描述完整的JMS堆栈,比如像消息的 异步 产生和消耗。JMS堆栈缺少的这一部分已经被添加,Spring 2.0现在提供对消息异步消耗的完整支持。
      让我们从一个例子开始。
      首先我们打开ActiveMQ。从ActiveMQ的安装路径上的bin目录,那里有一个ActiveMQ.bat,双击执行即可。不过要注意必须先设置java_home环境变量。ActiveMQ默认的服务端口是61616。
      然后我们开始配置Spring配置文件。我起名为spring-jms.xml
  • 首先要配置一个ConnectionFactory代码如下
    Code:

            
    [Ctrl+A Select All]
      
    这里用到的ConnectionFactory是ActiveMQ提供的工厂,为了能使用这个工厂,我们必须在项目中添加以下几个jar文件:
    geronimo-jms_1.1_spec-1.0.jar,
    activeio-core-3.0-beta3.jar,
    activemq-core-4.0.1.jar,
    backport-util-concurrent-2.1.jar,
    commons-logging-1.0.4.jar,
    geronimo-j2ee-management_1.0_spec-1.0.jar
    以上这些Jar文件都存在于ActiveMQ安装目录的lib目录下,这些可是我一个一个试验出来的,累个半死。。
  • 然后应该配置一个Queue(我使用的是点对点方式),不过ActiveMQ只要提供一个名字就可以自动创建队列,因此这一步省了,呵呵
  • 下面就轮到Spring的支持类了,首先是JmsTemplate。这个类提供了大量的方法简化我们对JMS的操作。常用的有两个,org.springframework.jms.core.JmsTemplate102和org.springframework.jms.core.JmsTemplate,这两个类分别支持JMS的1.02版本和1.1版本。现在比较常用的还是1.02版本。配置如下
    Code:
       
            
            
            
            
            
       
    [Ctrl+A Select All]
    上面的配置中用到了第一步配置的connectionFactory以及一个消息转换的类messageConverter,这个类实现了org.springframework.jms.support.converter.MessageConverter接口,可以在消息发送之前和接受之后进行消息类型转换。具体的看最后的实例代码。配置代码如下:
    Code:
       
          
       
            
                
            
       
    [Ctrl+A Select All]
    这里还配置了发送的消息的存在时间timeToLive,目标Queue的名字defaultDestinationName,接受消息超时时间receiveTimeout
  • 配置发送代码
    Code:
         
       
            
       
    [Ctrl+A Select All]
  • 接着配置监听器,这是Spring2.0新增的功能,配置如下:
    Code:

       
       
            
                
            
            
            
       
       
       
            
            
            
            
       
    [Ctrl+A Select All]
          Spring配置监听器有很多种选择,在这里我选择这回种MessageListenerAdapter方法主要是因为这个方法比较灵活。实现他只要一个很普通的java类即可,和JMS以及Spring的耦合度最低。其中方法onMessage可以随便修改方法名,只要在配置文件中对应的修改就好了。
         MessageListenerAdapter还有一个功能就是如果处理方法(我这里是onMessage)返回一个非空值,它将自动返回一个响应消息。这个消息会返回给JMS Reply-To属性定义的目的地(如果存在),或者是MessageListenerAdapter设置(如果配置了)的缺省目的地;如果没有定义目的地,那么将产生一个InvalidDestinationException异常(此异常将不会只被捕获而不处理,它将沿着调用堆栈上传)。
         这样我们的配置就都完成了。接下来我们来实现对应的Java文件
    先是接口文件发送消息接口IApmgtMessageProducer.java
    Code:

    public interface IApmgtMessageProducer {
        public abstract void sendMessage(ApmgtMessageData messageData);
    }
    [Ctrl+A Select All]
    接受消息接口IApmgtMessageListener.java
    Code:
    public interface IApmgtMessageListener {
        public void onMessage(ApmgtMessageData message);
    }
    [Ctrl+A Select All]
    发消息的文件DefaultApmgtMessageProducer.java
    Code:
    public class DefaultApmgtMessageProducer implements IApmgtMessageProducer {
       
        private JmsTemplate jmsTemplate;
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
        public void sendMessage(ApmgtMessageData messageData) {
            this.jmsTemplate.convertAndSend(messageData);
        }
    }
    [Ctrl+A Select All]
    收消息文件DefaultApmgtMessageListener.java
    Code:

    public class DefaultApmgtMessageListener implements IApmgtMessageListener {
        public void onMessage(ApmgtMessageData message) {
            System.out.println("监听到消息:"+message);
        }
    }
    [Ctrl+A Select All]
    消息转换类ApmgtMessageConverter.java
    Code:
    public class ApmgtMessageConverter implements MessageConverter {
        private Log log = LogFactory.getLog(ApmgtMessageConverter.class);
        private SimpleMessageConverter converter;
        public void setConverter(SimpleMessageConverter converter) {
            this.converter = converter;
        }
        public Object fromMessage(Message message) throws JMSException, MessageConversionException {
            if (message instanceof ObjectMessage) {
                ObjectMessage o_message = (ObjectMessage)message;
                MessageHeader header = new MessageHeader();
                header.setId(message.getLongProperty("id"));
                header.setReceiver(message.getIntProperty("receiver"));
                header.setSender(message.getIntProperty("sender"));
                header.setSendPerson(message.getStringProperty("sendPerson"));
                header.setType(message.getIntProperty("type"));
                Serializable messageContent = o_message.getObject();
                ApmgtMessageData messageData = new ApmgtMessageData();
                messageData.setMessageContent(messageContent);
                messageData.setMessageHeader(header);
                return messageData;
            }
            return null;
        }
        public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
            if (object instanceof ApmgtMessageData) {
                ApmgtMessageData data = (ApmgtMessageData) object;
                Message message = converter.toMessage(data.getMessageContent(), session);
                message.setLongProperty("id", data.getMessageHeader().getId());
                message.setIntProperty("receiver", data.getMessageHeader().getReceiver());
                message.setIntProperty("sender", data.getMessageHeader().getSender());
                message.setIntProperty("type", data.getMessageHeader().getType());
                message.setStringProperty("sendPerson", data.getMessageHeader().getSendPerson());
                log.info("发送消息[MessageSender]:\n" + message);
                 return message;
            } else {
                return null;
            }
        }
    }
    [Ctrl+A Select All]
    消息类文件  消息父类:ApmgtMessageData.java
    Code:
    public class ApmgtMessageData{
        protected T messageContent;
       
        protected MessageHeader messageHeader;
        public T getMessageContent() {
            return this.messageContent;
        }
        public MessageHeader getMessageHeader() {
            return this.messageHeader;
        }
        public void setMessageContent(T messageContent) {
            this.messageContent = messageContent;
        }
        public void setMessageHeader(MessageHeader messageHeader) {
            this.messageHeader = messageHeader;
        }
    }
    [Ctrl+A Select All]
    消息属性的一个类MessageHeader.java
    Code:
    public class MessageHeader {
        /**
         * 消息ID
         */
        private long id;
        /**
         * 消息类型
         */
        private int type;
        /**
         * 消息发送方,发送消息的模块
         */
        private int sender;
        /**
         * 消息接收方,接收消息的模块
         */
        private int receiver;
        /**
         * 消息发送者,具体的用户
         */
        private String sendPerson;
        public MessageHeader(){
            this.id = System.currentTimeMillis() ;
        }
       
        public long getId() {
            return id;
        }
        public void setId(long id) {
            this.id = id;
        }
        public String getSendPerson() {
            return sendPerson;
        }
        public void setSendPerson(String sendPerson) {
            this.sendPerson = sendPerson;
        }
        public int getReceiver() {
            return receiver;
        }
        public void setReceiver(int receiver) {
            this.receiver = receiver;
        }
        public int getSender() {
            return sender;
        }
        public void setSender(int sender) {
            this.sender = sender;
        }
        public int getType() {
            return type;
        }
        public void setType(int type) {
            this.type = type;
        }
    }
    [Ctrl+A Select All]
    消息体的类ModPasswordRequest.java
    Code:
    public class ModPasswordRequest implements Serializable{
        private static final long serialVersionUID = 1L;
        /**
         * 旧密码
         */
        private String oldPassword;
       
        /**
         * 新密码
         */
        private String newPassword;
        public String getNewPassword() {
            return newPassword;
        }
        public void setNewPassword(String newPassword) {
            this.newPassword = newPassword;
        }
        public String getOldPassword() {
            return oldPassword;
        }
        public void setOldPassword(String oldPassword) {
            this.oldPassword = oldPassword;
        }
    }
    [Ctrl+A Select All]
    消息类:ApmgtModPasswordRequest.java
    Code:
    public class ApmgtModPasswordRequest extends ApmgtMessageData {
       
       
       
        private static final int REQ_MODPASSWORD = 0;
        private static final int INTF = 1;
        private static final int APMGT = 2;
        public void init(){
            messageHeader = new MessageHeader();
            messageContent = new ModPasswordRequest();
            messageHeader.setType(REQ_MODPASSWORD);
            messageHeader.setSender(INTF);
            messageHeader.setReceiver(APMGT);
            messageContent.setNewPassword("123456");
            messageContent.setOldPassword("654321");
        }
       
    }
    [Ctrl+A Select All]
    最后是测试类Main.java
    Code:
        public class Main {
        public static void main(final String[] args) throws Exception {
            
            PropertyConfigurator.configure("log4j.properties");
            
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(new String[] { "spring-jms.xml" });
    //        ctx.registerShutdownHook();
            
            IApmgtMessageProducer producer = (IApmgtMessageProducer)ctx.getBean("producer");
            
            ApmgtModPasswordRequest messageData = new ApmgtModPasswordRequest();
            messageData.setMessageHeader(new MessageHeader());
            messageData.setMessageContent(new ModPasswordRequest());
            messageData.init();
            
            producer.sendMessage(messageData);
        }
    }
    [Ctrl+A Select All]
    还有两个配置文件,第一个spring-jms.xml
    Code:
    [/url]
       
            
                
                    apmgt.properties
                
            
       
       
       
       
       
       
            
       
       
       
       
       
            
            
            
            
            
       
       
       
            
                
            
       
       
       
            
       
       
       
            
                
            
            
            
       
       
       
            
            
            
            
       
    [Ctrl+A Select All]
    apmgt.properties
    Code:
    #jms properties
    jms.brokerURL=tcp://localhost:61616
    jms.receiveTimeout=3000
    jms.destinationName.cmpp=cmpp
    jms.messageSelector=receiver=2
    #one day is 86400000 ms. 0 is means that it lives forever.
    jms.timeToLive=86400000
    [Ctrl+A Select All]
    最后后还有源代码,希望对大家有帮助,写了2个小时,真累
    ActiveMQ和Tomcat结合使用,网上有很多文章了,这里就不说了,over

    转至:http://blog.iecn.net/blog/html/do-showone-tid-892.html

    本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u/26076/showart_204057.html
  • 您需要登录后才可以回帖 登录 | 注册

    本版积分规则 发表回复

      

    北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
    未成年举报专区
    中国互联网协会会员  联系我们:huangweiwei@itpub.net
    感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

    清除 Cookies - ChinaUnix - Archiver - WAP - TOP