JMS Topic和Queue的简单实现_Tomcat, WebLogic及J2EE讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Tomcat, WebLogic及J2EE讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 2981 | 回复: 0   主题: JMS Topic和Queue的简单实现        下一篇 
feng.gao
注册用户
等级:中士
经验:239
发帖:15
精华:0
注册:1970-1-1
状态:离线
发送短消息息给feng.gao 加好友    发送短消息息给feng.gao 发消息
发表于: IP:您无权察看 2017-5-12 17:48:47 | [全部帖] [楼主帖] 楼主

在weblogic 控制台创建两种模式的对应资源

image.png

Queue(点对点消息传送模型)

  在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。点对点消息模型有一些特性,如下:


每个消息只有一个接收者;

消息发送者和接收者并没有时间依赖性;

当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息;

当接收者收到消息的时候,会发送确认收到通知(acknowledgement)。


package com.landing.quene;


import javax.jms.JMSException;

import javax.jms.Queue;

import javax.jms.QueueConnection;

import javax.jms.QueueConnectionFactory;

import javax.jms.QueueSender;

import javax.jms.QueueSession;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.naming.Context;

import javax.naming.NamingException;


import org.junit.Test;


import com.landing.utils.ContextUtil;

//消息发送方

public class Sender {

@Test

public void SendMessage() {

Context ctx = ContextUtil.getContext();

QueueConnection conn = null;

QueueSession session = null;

try {

//根据JNDI获取QueueConnectionFactory对象

QueueConnectionFactory qConnectionFactory = (QueueConnectionFactory) ctx.lookup("javax.jms.QueueConnectionFactory");

//根据QueueConnectionFactory对象获取QueueConnection对象

conn = qConnectionFactory.createQueueConnection();

//根据QueueConnection对象获取QueueSession对象那个

session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

//根据JNDI回去queue对象

Queue queue = (Queue) ctx.lookup("queue_test");

//创建消息发送者

QueueSender sender = (QueueSender) session.createSender(queue);

//创建消息

TextMessage tMessage = session.createTextMessage("hello world");

//发送消息

sender.send(tMessage);

} catch (NamingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} finally{

if(session != null){

try {

session.close();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

if(conn != null) {

try {

conn.close();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

}


package com.landing.quene;


import javax.jms.JMSException;

import javax.jms.Queue;

import javax.jms.QueueConnection;

import javax.jms.QueueConnectionFactory;

import javax.jms.QueueReceiver;

import javax.jms.QueueSession;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.naming.Context;

import javax.naming.NamingException;


import com.landing.utils.ContextUtil;

//消息接收方

public class Receiver {

public static TextMessage receiveMessage() {

Context ctx = ContextUtil.getContext();

QueueConnection conn = null;

QueueSession session = null;

TextMessage tMessage = null;

try {

//根据JNDI获取QueueConnectionFactory对象

QueueConnectionFactory qConnectionFactory = (QueueConnectionFactory) ctx.lookup("javax.jms.QueueConnectionFactory");

//根据QueueConnectionFactory对象获取QueueConnection对象

conn = qConnectionFactory.createQueueConnection();

//根据QueueConnection对象获取QueueSession对象那个

session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

//根据JNDI回去queue对象

Queue queue = (Queue) ctx.lookup("queue_test");

//创建消息接收方

QueueReceiver receive = (QueueReceiver) session.createReceiver(queue);

//开始接收消息

conn.start();

tMessage = (TextMessage) receive.receive();

} catch (NamingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} finally{

if(session != null){

try {

session.close();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

if(conn != null) {

try {

conn.close();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

return tMessage;

}

public static void main(String[] args) {

System.out.println(receiveMessage());

}

}


Topic(发布/订阅消息传递模型)

  在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅topic。topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。


发布/订阅消息模型特性如下:


一个消息可以传递给多个订阅者

发布者和订阅者有时间依赖性,只有当客户端创建订阅后才能接受消息,且订阅者需一直保持活动状态以接收消息。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

package com.landing.topic;


import javax.jms.JMSException;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.jms.Topic;

import javax.jms.TopicConnection;

import javax.jms.TopicConnectionFactory;

import javax.jms.TopicPublisher;

import javax.jms.TopicSession;

import javax.naming.Context;

import javax.naming.NamingException;


import org.junit.Test;


import com.landing.utils.ContextUtil;

//消息发布方

public class Publisher {

@Test

public void sendMessage() {

Context ctx = ContextUtil.getContext();

TopicConnection conn = null;

TopicSession session = null;

try {

//获取连接工厂

TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup("javax.jms.TopicConnectionFactory");

//根据连接工厂获取对应的Connection对象

conn = connFactory.createTopicConnection();

//根据Connection对象获取Session对象

session = conn.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);

//根据JNDI获取topic对象

Topic topic = (Topic) ctx.lookup("topic_test");

//创建消息发布者

TopicPublisher topicPublisher = session.createPublisher(topic);

TextMessage message = session.createTextMessage();

message.setText("Hello World");

topicPublisher.publish(message);

} catch (NamingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} finally{

if(session != null){

try {

session.close();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

if(conn != null) {

try {

conn.close();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

}


package com.landing.topic;


import javax.jms.JMSException;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.jms.Topic;

import javax.jms.TopicConnection;

import javax.jms.TopicConnectionFactory;

import javax.jms.TopicSession;

import javax.jms.TopicSubscriber;

import javax.naming.Context;

import javax.naming.NamingException;


import org.junit.Test;


import com.landing.utils.ContextUtil;

//消息接收方

public class Subcriber {

@Test

public TextMessage receiveMessage() {

Context ctx = ContextUtil.getContext();

TopicConnection conn = null;

TopicSession session = null;

TextMessage message = null;

try {

//获取连接工厂

TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup("javax.jms.TopicConnectionFactory");

//根据连接工厂获取对应的Connection对象

conn = connFactory.createTopicConnection();

//根据Connection对象获取Session对象

session = conn.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);

//根据JNDI获取topic对象

Topic topic = (Topic) ctx.lookup("topic_test");

//创建消息订阅者

TopicSubscriber topicSubscriber = session.createSubscriber(topic);

//开启消息接收

conn.start();

message = (TextMessage) topicSubscriber.receive();

} catch (NamingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} finally{

if(session != null){

try {

session.close();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

if(conn != null) {

try {

conn.close();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

return message;

}

}


//通过ContextUtil来获取Context对象

package com.landing.utils;


import java.util.Hashtable;


import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;


public class ContextUtil {

private static Context context;


public static Context getContext() {


Hashtable env = new Hashtable();

env.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");

env.put(Context.PROVIDER_URL, "t3://localhost:7001");

try {

if(context == null) {

context = new InitialContext(env);

}

} catch (NamingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return context;

}

}





赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论