[原创]JMS发布/订阅模型(topic)_Tomcat, WebLogic及J2EE讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Tomcat, WebLogic及J2EE讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 3496 | 回复: 0   主题: [原创]JMS发布/订阅模型(topic)        下一篇 
xiuwen.zhao
高级会员
等级:上尉
经验:668
发帖:38
精华:0
注册:1970-1-1
状态:离线
发送短消息息给xiuwen.zhao 加好友    发送短消息息给xiuwen.zhao 发消息
发表于: IP:您无权察看 2014-9-8 22:21:21 | [全部帖] [楼主帖] 楼主

1.发布/订阅模型

在发布/订阅模型(Pub-Sub)中,每个消息被发送到一个消息主题,该主题可以拥有多个订阅者。JMS系统负责将消息的副本传给该主题的每个订阅者。

Pub-Sub消息模型与PTP消息模型的不同点:

对于Pub-Sub消息模型而言,当多个消息消费者同时订阅某个主题时,只要有一个消息生产者向该主题发布一条消息,每个消费者都可以收到一个消息的副本。而对于PTP消息模型来说,当消息生产者向消息队列发送一条消息之后,只有一个消息消费者可以接收到该消息。

对于Pub-Sub消息模型,当消息生产者将某个消息发布到指定主题时,即使某个消息消费者订阅了该主题,但如果该消息抵达消息主题时该消息消费者处于离线状态,它将无法收到该消息。而对于PTP消息模型来说,当消息生产者向消息队列发送一条消息时,即使消息消费者处于离线状态,只要该消息还处于有效期之内,该消息消费者总可以从消息队列中提取到该消息。

2.编程实例

我们在WebLogic服务器中配置了一个JNDI名为“topic”的主题,它可以作为Pub-Sub消息模型的消息目的地来使用。WebLogic服务器中提供了默认的连接工厂,其JNDI名为“weblogic.jms.ConnectionFactory”。

发送消息到主题的代码如下:

public class TopicMessageSender{
      public void sendMessage() throws Exception{
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            Context ctx = getInitialContext();
            ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(CONNECTION_FACTORY_JNDI);
            Destination dest = (Destination)ctx.lookup("topic");
            Connection conn = connFactory.createConnection();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = session.createProducer(dest);
            sender.setDeliveryMode(DeliveryMode.PERSISTENT);
            sender.setTimeToLive(20000);
            TextMessage msg = session.createTextMessage();
            msg.setText("Hello JMS");
            sender.send(msg);
            msg.setText("The time is "+new Date().toString());
            sender.send(msg);
            session.close();
            conn.close();
      }
      private Context getInitialContext(){
            final String INIT_FACTORY = "weblogic.jndi.WLInitialContextFactory";
            final String SERVER_URL = "t3://localhost:7001";
            Context ctx = null;
            try{
                  Properties props = new Properties();
                  props.put(Context.INITIAL_CONTEXT_FACTORY, INIT_FACTORY);
                  props.put(Context.PROVIDER_URL, SERVER_URL);
                  ctx = new InitialContext(props);
            }catch(NamingException ne){
            System.out.println("can't connect the WebLogic " + SERVER_URL);
            ne.printStackTrace();
      }
      return ctx;
}
public static void main(String[] args) throws Exception{
      TopicMessageSender mp = new TopicMessageSender();
      mp.sendMessage();
}
}


从消息主题中读取消息的代码如下:

public class Consumer {
      public void receiveMessage() throws Exception {
            final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
            Context ctx = getInitialContext();
            ConnectionFactory connFactory = (ConnectionFactory) ctx
            .lookup(CONNECTION_FACTORY_JNDI);
            Topic dest = (Topic) ctx.lookup("topic");
            Connection conn = connFactory.createConnection();
            conn.setClientID("landingbj");
            conn.start();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer receiver = session.createDurableSubscriber(dest,
            "landingbj");
            TextMessage msg = (TextMessage) receiver.receive();
            if (msg != null) {
                  System.out.println(msg);
                  System.out.println("The message: " + msg.getText());
            }else{
            System.out.println("No message can be shown!");
      }
      session.close();
      conn.close();
}
private Context getInitialContext() {
      final String INIT_FACTORY = "weblogic.jndi.WLInitialContextFactory";
      final String SERVER_URL = "t3://localhost:7001";
      Context ctx = null;
      try {
            Properties props = new Properties();
            props.put(Context.INITIAL_CONTEXT_FACTORY, INIT_FACTORY);
            props.put(Context.PROVIDER_URL, SERVER_URL);
            ctx = new InitialContext(props);
      } catch (NamingException ne) {
            System.out.println("can't connect the WebLogic " + SERVER_URL);
            ne.printStackTrace();
      }
      return ctx;
}
public static void main(String[] args) throws Exception {
      Consumer c = new Consumer();
      c.receiveMessage();
}
}




赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论