1. 技术基础简介
1.1 JMS
1.1.1 JMS简介
JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。
JMS是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC(JavaDatabase Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本 (TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
1.1.2 JMS体系架构
JMS有以下元素组成。
JMS提供者
连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
JMS客户
生产或消费消息的基于Java的应用程序或对象。
JMS生产者
创建并发送消息的JMS客户。
JMS消费者
接收消息的JMS客户。
JMS消息
包括可以在JMS客户之间传递的数据的对象
JMS队列
一个容纳那些被发送的等待阅读的消息的区域。队列暗示,这些消息将按照顺序发送。一旦一个消息被阅读,该消息将被从队列中移走。
JMS主题
一种支持发送消息给多个订阅者的机制。
1.1.3 JMS模型
Java消息服务应用程序结构支持两种模型:
点对点或队列模型
发布者/订阅者模型
在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:
只有一个消费者将获得消息
生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
每一个成功处理的消息都由接收者签收
发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:
多个消费者可以获得消息
在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。
使用Java语言,JMS提供了将应用与提供数据的传输层相分离的方式。同一组Java类可以通过JNDI中关于提供者的信息,连接不同的JMS提供者。这一组类首先使用一个连接工厂以连接到队列或主题,然后发送或发布消息。在接收端,客户接收或订阅这些消息。
1.1.4 传递消息方式
JMS现在有两种传递消息的方式。标记为NON_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用暂存后再转送的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失但是得等到这个服务恢复联机时才会被传递。所以默认的消息传递方式是非持久性的。即使使用非持久性消息可能降低内务和需要的存储器,并且这种传递方式只有当你不需要接收所有的消息时才使用。
虽然JMS规范并不需要JMS供应商实现消息的优先级路线,但是它需要递送加快的消息优先于普通级别的消息。JMS定义了从0到9的优先级路线级别,0是最低的优先级而9则是最高的。更特殊的是0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。举例来说:topicPublisher.publish (message, DeliveryMode.PERSISTENT, 8, 10000); //Pub-Sub 或queueSender.send(message,DeliveryMode.PERSISTENT, 8, 10000);//P2P 这个代码片断,有两种消息模型,映射递送方式是持久的,优先级为加快型,生存周期是10000 (以毫秒度量 )。如果生存周期设置为零,这则消息将永远不会过期。当消息需要时间限制否则将使其无效时,设置生存周期是有用的。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· StreamMessage -- Java原始值的数据流
· MapMessage--一套名称-值对
· TextMessage--一个字符串对象
· ObjectMessage--一个序列化的 Java对象
· BytesMessage--一个未解释字节的数据流
1.1.5 JMS应用程序接口
ConnectionFactory接口(连接工厂)
用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
Connection接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
Destination接口(目标)
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
MessageConsumer接口(消息消费者)
由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
MessageProducer接口(消息生产者)
由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
Message接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:
消息头(必须):包含用于识别和为消息寻找路由的操作设置。
一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。
消息接口非常灵活,并提供了许多方式来定制消息的内容。
Session接口(会话)
表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。
1.1.6 JMS提供者实现
JMS流程图
要使用Java消息服务,你必须要有一个JMS提供者,管理会话和队列。现在既有开源的提供者也有专有的提供者。
开源的提供者包括:
Apache ActiveMQ
JBoss 社区所研发的 HornetQ
Joram
Coridan的MantaRay
The OpenJMS Group的OpenJMS
专有的提供者包括:
BEA的BEA WebLogicServer JMS
TIBCO Software的EMS
GigaSpaces Technologies的GigaSpaces
Softwired 2006的iBus
IONA Technologies的IONA JMS
SeeBeyond的IQManager(2005年8月被Sun Microsystems并购)
webMethods的JMS+ -
my-channels的Nirvana
Sonic Software的SonicMQ
SwiftMQ的SwiftMQ
IBM的WebSphere MQ
下面使用spring进行jms开发
一、配置文件
[html]view plaincopyprint?
- <?xml version="1.0" encoding="GBK"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:aop="http://www.springframework.org/schema/aop"
- xmlns:tx="http://www.springframework.org/schema/tx"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context.xsd
- http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd"
- default-autowire="byName" default-lazy-init="true">
- <context:annotation-config/>
- <context:component-scan base-package="com.jms"></context:component-scan>
- <!-- 支持 @AspectJ 标记 -->
- <aop:aspectj-autoproxy />
- <!-- JNDI上下文模板 -->
- <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
- <property name="environment">
- <props>
- <!-- your weblogic url -->
- <prop key="java.naming.provider.url">t3://localhost:7001</prop>
- <prop key="java.naming.factory.initial">weblogic.jndi.WLInitialContextFactory</prop>
- <!--weblogic username and password -->
- <prop key="java.naming.security.principal">weblogic</prop>
- <prop key="java.naming.security.credentials">weblogic123</prop>
- </props>
- </property>
- </bean>
- <!-- 连接工厂配置 -->
- <bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
- <property name="jndiName">
- <value>ConnectionFactory</value>
- </property>
- <property name="jndiTemplate">
- <ref bean="jndiTemplate" />
- </property>
- </bean>
- <!-- 发送队列配置 -->
- <bean id="destinatinForSender" class="org.springframework.jndi.JndiObjectFactoryBean">
- <property name="jndiTemplate">
- <ref bean="jndiTemplate" />
- </property>
- <property name="jndiName">
- <value>Queue</value>
- </property>
- </bean>
- <!-- JMS 简单消息转换 -->
- <bean id="jmsConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
- <!-- JMS template配置 -->
- <bean id="jmsTemplateSender" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory">
- <ref bean="jmsConnectionFactory" />
- </property>
- <property name="defaultDestination">
- <ref bean="destinatinForSender" />
- </property>
- <property name="messageConverter">
- <ref bean="jmsConverter" />
- </property>
- </bean>
- <!-- 接收队列配置 -->
- <bean id="destinatinForReceiver" class="org.springframework.jndi.JndiObjectFactoryBean">
- <property name="jndiTemplate">
- <ref bean="jndiTemplate" />
- </property>
- <property name="jndiName">
- <value>Queue</value>
- </property>
- </bean>
- <bean id="jmsTemplateReceiver" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory">
- <ref bean="jmsConnectionFactory" />
- </property>
- <property name="defaultDestination">
- <ref bean="destinatinForReceiver" />
- </property>
- <property name="messageConverter">
- <ref bean="jmsConverter" />
- </property>
- </bean>
- </beans>
<?xml version="1.0" encoding="GBK"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd"
default-autowire="byName" default-lazy-init="true">
<context:annotation-config/>
<context:component-scan base-package="com.jms"></context:component-scan>
<!-- 支持 @AspectJ 标记 -->
<aop:aspectj-autoproxy />
<!-- JNDI上下文模板 -->
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<!-- your weblogic url -->
<prop key="java.naming.provider.url">t3://localhost:7001</prop>
<prop key="java.naming.factory.initial">weblogic.jndi.WLInitialContextFactory</prop>
<!--weblogic username and password -->
<prop key="java.naming.security.principal">weblogic</prop>
<prop key="java.naming.security.credentials">weblogic123</prop>
</props>
</property>
</bean>
<!-- 连接工厂配置 -->
<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName">
<value>ConnectionFactory</value>
</property>
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
</bean>
<!-- 发送队列配置 -->
<bean id="destinatinForSender" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
<property name="jndiName">
<value>Queue</value>
</property>
</bean>
<!-- JMS 简单消息转换 -->
<bean id="jmsConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
<!-- JMS template配置 -->
<bean id="jmsTemplateSender" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref bean="jmsConnectionFactory" />
</property>
<property name="defaultDestination">
<ref bean="destinatinForSender" />
</property>
<property name="messageConverter">
<ref bean="jmsConverter" />
</property>
</bean>
<!-- 接收队列配置 -->
<bean id="destinatinForReceiver" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
<property name="jndiName">
<value>Queue</value>
</property>
</bean>
<bean id="jmsTemplateReceiver" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref bean="jmsConnectionFactory" />
</property>
<property name="defaultDestination">
<ref bean="destinatinForReceiver" />
</property>
<property name="messageConverter">
<ref bean="jmsConverter" />
</property>
</bean>
</beans>
;
二、发送队列
[java]view plaincopyprint?
- package com.jms.sender;
- import java.io.Serializable;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.Session;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.support.FileSystemXmlApplicationContext;
- import org.springframework.jms.core.JmsTemplate;
- import org.springframework.jms.core.MessageCreator;
- import com.jms.dto.UserDto;
- public class JmsSender {
- private JmsTemplate jmsTemplate = null;
-
- public JmsSender() {
- ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext.xml");
- JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplateSender");
- this.setJmsTemplate(jmsTemplate);
- }
-
- public void sendTextMessage(final String str) {
- this.jmsTemplate.send(new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createTextMessage(str);
- }
- });
- }
-
- public void sendObjectMessage(final Object obj) {
- this.jmsTemplate.send(new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createObjectMessage((Serializable) obj);
- }
- });
- }
-
- public void setJmsTemplate(JmsTemplate jmsTemplate) {
- this.jmsTemplate = jmsTemplate;
- }
-
- public JmsTemplate getJmsTemplate() {
- return jmsTemplate;
- }
-
- public static void main(String[] args) {
- JmsSender jmsSender = new JmsSender();
- jmsSender.sendTextMessage("发送消息!");
- jmsSender.sendObjectMessage(new UserDto("u1","p1",10));
- }
- }
package com.jms.sender;
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import com.jms.dto.UserDto;
public class JmsSender {
private JmsTemplate jmsTemplate = null;
public JmsSender() {
ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext.xml");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplateSender");
this.setJmsTemplate(jmsTemplate);
}
public void sendTextMessage(final String str) {
this.jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(str);
}
});
}
public void sendObjectMessage(final Object obj) {
this.jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage((Serializable) obj);
}
});
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public static void main(String[] args) {
JmsSender jmsSender = new JmsSender();
jmsSender.sendTextMessage("发送消息!");
jmsSender.sendObjectMessage(new UserDto("u1","p1",10));
}
}
;
三、接收队列
[java]view plaincopyprint?
- package com.jms.receiver;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.support.FileSystemXmlApplicationContext;
- import org.springframework.jms.core.JmsTemplate;
- import com.jms.dto.UserDto;
- public class JmsReceiver {
- private JmsTemplate jmsTemplate = null;
-
- public JmsReceiver() {
- ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext.xml");
- JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplateReceiver");
- this.setJmsTemplate(jmsTemplate);
- }
-
- public void receiveMessage() {
- System.out.println("开始接收消息!");
- Object msg = this.jmsTemplate.receiveAndConvert();
- if (msg instanceof String) {
- System.out.println(msg);
- } else if (msg instanceof Object) {
- UserDto userDto = (UserDto) msg;
- System.out.println(userDto);
- System.out.println(userDto.getName());
- System.out.println(userDto.getPass());
- System.out.println(userDto.getAge());
- }
- System.out.println("接收消息结束!");
- }
-
- public void setJmsTemplate(JmsTemplate jmsTemplate) {
- this.jmsTemplate = jmsTemplate;
- }
-
- public JmsTemplate getJmsTemplate() {
- return jmsTemplate;
- }
-
- public static void main(String[] args) {
- final JmsReceiver jmsReceiver = new JmsReceiver();
- try {
-
- Thread thread = new Thread(new Runnable() {
- public void run() {
- while(true){
- jmsReceiver.receiveMessage();
- }
- }
- });
- thread.start();
- Thread.sleep(5000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
package com.jms.receiver;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import com.jms.dto.UserDto;
public class JmsReceiver {
private JmsTemplate jmsTemplate = null;
public JmsReceiver() {
ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext.xml");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplateReceiver");
this.setJmsTemplate(jmsTemplate);
}
public void receiveMessage() {
System.out.println("开始接收消息!");
Object msg = this.jmsTemplate.receiveAndConvert();
if (msg instanceof String) {
System.out.println(msg);
} else if (msg instanceof Object) {
UserDto userDto = (UserDto) msg;
System.out.println(userDto);
System.out.println(userDto.getName());
System.out.println(userDto.getPass());
System.out.println(userDto.getAge());
}
System.out.println("接收消息结束!");
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public static void main(String[] args) {
final JmsReceiver jmsReceiver = new JmsReceiver();
try {
Thread thread = new Thread(new Runnable() {
public void run() {
while(true){
jmsReceiver.receiveMessage();
}
}
});
thread.start();
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
;
四、普通java对象【UserDto.java,注意必须要实现Serializable接口进行序列化,不然发送对象数据的时候会报错】
[java]view plaincopyprint?
- package com.jms.dto;
- import java.io.Serializable;
- public class UserDto implements Serializable
- {
- private static final long serialVersionUID = 1L;
- private String name;
- private String pass;
- private int age;
-
- public UserDto(String name, String pass,int age) {
- super();
- this.age = age;
- this.name = name;
- this.pass = pass;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public String getPass() {
- return pass;
- }
- public void setPass(String pass) {
- this.pass = pass;
- }
- public int getAge() {
- return age;
- }
- public void setAge(int age) {
- this.age = age;
- }
-
- }
package com.jms.dto;
import java.io.Serializable;
public class UserDto implements Serializable
{
private static final long serialVersionUID = 1L;
private String name;
private String pass;
private int age;
public UserDto(String name, String pass,int age) {
super();
this.age = age;
this.name = name;
this.pass = pass;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
;
转自:http://blog.csdn.net/wyc_cs/article/details/8901125