在分布式企业级应用程序中,异步消息机制用于有效地协调各个部分的工作。
J2EE为我们提供了JMS和消息驱动豆(Message-Driven Bean),用来实现应用程序各个部件之间的异步消息传递。
一.什么是消息系统?
通常一个消息系统允许分开的未耦合的应用程序之间可靠地异步通信。在企业应用时,需要一种异步的,非阻塞的消息传递。比如,一个客户端可能希望给一个服务器发送一个请求后,不在乎是否马上能得到回应。这样,客户端没有理由必须等待服务器处理请求。客户端应用程序在递交一个请求之后,只需确保请求到达服务器端后,就可以处理其他任务。通常,这是很高效的。消息系统提供了许多其他分布式对象计算模型没有的优点。它鼓励在消息产生者和使用者之间的"松耦合",在它们之间有很高程度的事务处理。对于使用者,它不在乎谁产生了消息,产生者是否仍在网络上以及消息是什么时候产生的。这就允许建立动态的,可靠的和灵活的系统。整个的子系统能被修改而不会影响系统的其他部分。
另外的优点包括:系统的高度可扩展性,容易与其他系统进行集成,以及高度的可靠性。由于可靠性和可扩展性,使得它们用于解决许多商业和科学计算问题。比如,消息系统是许多应用程序的基础,这些应用程序可以是工作流,网络管理,通信服务或供应链管理程序。在JAVA技术中,处理异步消息的能力是通过JMS来实现的。JMS最初设计是为了给传统的消息对象中间件提供一个标准的JAVA接口。而这些产品是在一个企业级应用程序中必须的。现在出现了许多支持JMS的纯JAVA的产品。
消息系统类型
通常有两种消息类型。
1.发布/订阅(publish/subscribe)
发布/订阅消息系统支持一个事件驱动模型,消息产生者和使用者都参与消息的传递。产生者发布事件,而使用者订阅感兴趣的事件,并使用事件。产生者将消息和一个特定的主题(Topic)连在一起,消息系统根据使用者注册的兴趣,将消息传给使用者。
2.点对点(Peer to peer)
在点对点的消息系统中,消息分发给一个单独的使用者。它维持一个"进入"消息队列。消息应用程序发送消息到一个特定的队列,而客户端从一个队列中得到消息。
二.JMS简介
JMS的目的是提供给消息系统客户一个固定的接口,而且与底层的消息提供者无关。这样,客户端的应用程序可以在不同的机器和操作系统中移植,而且能在不同的消息系统产品之间转移。JMS客户端都是建立在JAVA技术上的,从而也能使用其他JAVA API,如JDBC数据库连接,使用JAVA BEAN组件模型,JDNI名字服务,JTA客户端事务处理控制以及J2SE和J2EE API来实现企业级应用服务程序。
1.JMS对象模型
ConnectionFactory是一个客户端用来创建一个Connection的管理对象。由于在Connection创建时有授权和通信建立过程,因此这个对象是比较大的。
Destination对象将一个消息的目的和服务提供者有关的地址及配置信息包装起来。
Session是JMS实体,用来支持事务处理和异步消息消费。JMS并不需要客户端的代码用于异步消息消费或能处理多个并发消息。通常,事务的复杂性都由一个Session来封装。
一个Session是一个原子单位的工作,与数据库的事务一样,要实现多线程事务比较困难。Session提供了在一个线程编程模式下的并发的优点。
MessageProducer和MessageConsumer对象由Session对象创建。用于发送和接受消息。为了确保消息的传递,JMS服务提供者处理的消息都要处于PERSISTENT模式。PERSISTENT模式使得JMS提供者出问题后,也能让消息保存下来。
Session,MessageProducer和MessageConsumer都不支持并发,而ConnectionFactory,Destination和Connection都支持并发。
2.JMS应用程序开发
JMS中的消息
在消息系统中,应用程序之间通信的关键是消息。因此使用JMS必须要先理解消息。
在JMS中,消息由三部分组成:
MESSAGE HEADER用于识别消息,比如用于判断一个给定的消息是否是一个"订阅者"
PROPERITIES用于与应用程序相关的,提供者相关的和可选项的信息
BODY是消息的内容,支持几种格式,包括TextMessage(对String一个简单的封装)和ObjectMessage(对任意对象的封装,但必须支持序列化),也支持其他格式。
TextMessage
一个TextMessage是一个String对象的封装。在只有文本对象传递时,是很有用的。它假设许多消息系统是建立在XML上的。从而TextMessage就可以成为包装它们的容器。
创建一个TextMessage对象很简单,如下面的代码:
TextMessage message=session.createMessage();
message.setText("Hello, world!");
ObjectMessage
如名字所示,它是对一个JAVA对象的封装的消息。任何可序列化的JAVA对象都能用于ObjectMessage,如果必须将多个对象封装在一个消息里传递,可以使用Collection对象,来包括多个序列化对象。
下面是创建一个ObjectMessage
ObjectMessage message=session.createObjectMessage();
message.setObject(myObject);
创建一个JMS客户端程序
一个典型的JMS客户端由下面的几个基本步骤来创建:
创建一个到消息系统提供者的连接(Connection)
创建一个Session,用于接收和发送消息
创建MessageProducer和MessageConsumer来创建和接收消息
当完成了上述步骤后,一个消息产生者客户端将创建并发布消息到一个主题,而消息使用者客户端会接收与一个主题相关的消息。
1.创建一个Connection
一个Connection提供客户端对底层的消息系统的访问。并实现资源的分配和管理。通过使用一个ConnectionFactory来创建一个Connection,通常用JDNI来指定:
Connection message=new initialContext();
TopicConnectionFactory topicConnectionFactory=(TopicConnectionFactory);
topic = (Topic) jndiContext.lookup(topicName);
topicConnection =topicConnectionFactory.createTopicConnection();
2.创建一个Session
Session是一个比较大的JMS对象,他提供了生产和消费消息的手段。用于创建消息使用者和消息产生者。
topicSession = topicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
两个参数用于控制事务和消息确认。
3.定位一个Topic
用JDNI来定位一个Topic,Topic用于识别发送或接收的消息,在发布/订阅系统中。订阅者订阅一个给定的Topic,而发布者将它发布的消息与一个Topic相连。
下面是创建一个Topic "WeatherReport"
Topic weatherTopic=messaging.lookup("WeatherReport");
4.启动Connection
在上面的初始化步骤之后,消息流是禁止的,用于防止在初始化时发生不可预料的行为。一旦初始化结束,必须让Connection启动消息系统。
topicConnection.start();
5.创建一个消息产生者
在发布/订阅里,一个产生者发布消息到一个指定的Topic。下面的代码显示创建一个产生者,以及后续的建立和发布一个简单文本消息。
TopicPublisher publisher=session.createPublisher(weatherTopic);
TexeMessage message=session.createMessage();
message.setText("ssss");
publisher.publish(message);
下面是一个消息使用者的代码
topicConnection =topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createSubscriber(topic);
topicListener = new MsgListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();
三.消息驱动豆简介
异步消息也可以由消息驱动豆来实现。在EJB 1.1规范中,定义了两种类型的EJB。分别是实体豆(Entity Bean)和会话豆(Session Bean)。客户端通常是以同步的,阻塞方式来调用豆的方法。消息驱动豆将EJB和JMS的功能结合在一起。
正如前述,会话豆通常实现商务逻辑,客户端不能共享一个会话豆。实体豆通常和一些在永久存储中的一些实体条目相对应的。这两种豆通常都有REMOTE和HOME接口,用来与客户端交互。并且,这些交互都是同步的,阻塞方式进行的。比如,一个请求发送给一个豆,通过阻塞式方法调用,服务器返回一个相应。调用者在收到返回后,才能进行下一步处理。消息驱动豆通常配置成是一个特别的主题(topic)或队列的客户端,作为消息的使用者。但消息驱动豆没有HOME和REMOTE接口。一个消息产生者将消息写入TOPIC或队列时,并不知道使用者是一个消息驱动豆。这就允许集成一个分布式的计算系统时,有很大的灵活性。消息驱动豆没有会话性质的状态,所有的实例在不处理请求时是相同的,这与无状态会话豆是类似的。将豆的实例放在缓冲池里,也是高效处理消息驱动豆的一种方法。一个消息驱动豆必须间接或直接地从javax.ejb.MessageDrivenBean接口继承而来。这个接口是由javax.jms.MessageListener继承而来。这个方法的一个参数是javax.jms.Message。可以是任何有效的JMS消息类型。方法的申明中并不包含一个thrown语句。因此在消息处理中,不会仍出应用程序异常。当容器接收到消息时,它首先是从一个缓冲池里得到现成的一个消息驱动豆,然后,如果配置文件需要的,容器还要设置一个和事务处理上下文的一个联系。当这些管理任务完成时,接收到的消息传递给onMessage()方法。一旦方法完成,事务确认或返回,豆又被重新放回到缓冲池。
ejbRemove()在把消息驱动豆从任何存储上删除时调用。并进行清楚操作和垃圾收集。必须在ejbRemove()方法中释放所有豆的实例用到的资源。
setMessageDrivenConnection()方法只有一个参数-javax.ejb.MessageDrivenContext的实例。MessageDrivenContext类与在实体和会话豆中的上下文类似。当一个豆的实例创建时,容器传入豆用的上下文。上下文中得到环境信息的方法,以及JTA UserTranscation类,用于豆管理事务处理的场合。
另外,豆提供者必须提供一个ejbCreate()方法(无参数),用于在EJB服务器对豆进行设置。豆实例可以在ejbCreate()方法中取得任何需要的资源。
消息驱动豆大大地简化了创建一个JMS使用者,创建和配置一个JMS消息使用者这些功能都交由EJB容器来做了。开发人员只需简单地实现消息驱动豆的接口,配置给EJB服务器,用来创建一个接收消息的商业逻辑部件。
四.一个实例
本文为了说明上面的概念,编写了一个消息驱动豆,一个Publisher和一个Subscriber的代码。请参见附录的源代码。
下面讲一下怎样运行实例。这里假设读者已经从SUN主页上下载了J2EE SDK 1.3 Bate,并已经安装好了。
1.创建一个WeatherReport
主题(Topic):
j2eeadmin -addJmsDestination WeatherReport topic
可以用下面命令看一下是否正确创建:
D:\j2sdkee1.3\bin>j2eeadmin -listJmsDestination JmsDestination
一般显示结果如下:
< JMS Destination : jms/Topic , javax.jms.Topic >
< JMS Destination : jms/Queue , javax.jms.Queue >
< JMS Destination : WeatherReport , javax.jms.Topic >
2.运行Subscriber:
D:\mysourcecode\testjms>java -Djms.properties=%J2EE_HOME%\config\jms_client.properties jmssub
一般显示结果如下:
Topic name is WeatherReport
Java(TM) Message Service 1.0.2 Reference Implementation (build b10)
To end program, enter Q or q, then <return>
Reading message: A sunny day.
3.运行Publisher:
打开另一个命令行窗口,输入下面命令:
set classpath=%J2EE_HOME%\lib\j2ee.jar;.
java -Djms.properties=%J2EE_HOME%\config\jms_client.properties jmspub
一般显示结果如下:
Topic name is WeatherReport
Java(TM) Message Service 1.0.2 Reference Implementation (build b10)
Publishing message: A sunny day.
4.删除一个话题
D:\j2sdkee1.3\bin>j2eeadmin -removeJmsDestination MyTopic
5.使用消息驱动豆
下面是用消息驱动豆实现接受消息。使用deploytool将附录中的Bean部署好,可以看到下面的信息:
Deploying message driven bean MsgBean, consuming from WeatherReport
Application testjms deployed.
然后再次运行Publisher。可以看到消息驱动豆输出下面的结果:
MESSAGE BEAN: Message received: A sunny day.