[转帖]weblogic中使用jms发送和接受消息_Tomcat, WebLogic及J2EE讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Tomcat, WebLogic及J2EE讨论区 »
总帖数
3
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 2104 | 回复: 2   主题: [转帖]weblogic中使用jms发送和接受消息        下一篇 
ke.li
注册用户
等级:上士
经验:283
发帖:12
精华:0
注册:1970-1-1
状态:离线
发送短消息息给ke.li 加好友    发送短消息息给ke.li 发消息
发表于: IP:您无权察看 2016-4-27 10:21:33 | [全部帖] [楼主帖] 楼主

一. 开篇语

今天很坑爹啊, 为了搞一下JMS这个玩意, 整了一天了, 光是weblogic的环境就换了两, 最后总算是搞定了, 写下这篇日志, 记录下学习的心得.

二. JMS

所谓的JMS其实就是异步通讯, 我们可以打个简单的比方: 

现在的手机基本上普及了, 手机它有两个基本功能, 一个是打电话, 一个是发短信. 打电话是同步的, 必须要保证双方都开机才能进行通话; 

而发短信则是异步的, 接收方不需要保持开机状态; 这就类似JMS了, 异步通讯实现了程序之间的松耦合的关系.

三. 几个概念

1. 为了实现JMS独立于不同供应商MS的专有技术, weblogic JMS采用了受管对象(administratored object)的机制

2. 受管对象就是由消息服务器通过管理界面创建, 程序通过JNDI接口取得这些对象

3. weblogic 中有两种受管对象:

①. connection factory: 用于创建消息服务器的connection对象 
②. distination: 用于创建消息目标和消息源 

四. 环境准备

1. myeclipse环境

① jre1.4

② j2ee 1.4 libraries

③ 导入weblogic.jar


2. weblogic环境

① 安装weblogic8.1并集成到myeclipse

      http://blog.csdn.net/zdp072/article/details/26831739

② 配置weblogic服务器 

1)、新建jms连接工厂,工厂名称为“ myJMSConnectionFactory ”, JNDI name为"myJMSConnectionFactoryJNDIName" .

2)、定义后备存储, 并填写存储目录.

3)、新建jms服务器,服务器名称为:“ myJMSServer ”.

4)、在“ myJMSServer ”服务下新建目标为“ myJMSQueue ”队列, JNDI name为"myJMSQueueJNDIName". 

5)、 在“ myJMSServer ”服务下新建目标为“ myJMSTopic ”主题, JNDI name为"myJMSTopicJNDIName". 

五. 代码测试

1. 测试P2P(point to point) - 点对点模式.

   生产者发送一条消息到消息服务器, 消息服务器发送给一个消费者, 这条消息不能再发送给其他消费者,  相当于队列, 先到先得.

①. 实体类User

/**
 * 实体类
 * @author zhangjim
 */public class User implements Serializable {  private static final long serialVersionUID = 1L;  private String name;  private int age;  public int getAge() {    return age;  }  public void setAge(int age) {    this.age = age;  }  public String getName() {    return name;  }  public void setName(String name) {    this.name = name;  }
}

②. 生产者QueueMsgSender

public class QueueMsgSender {  // Defines the JNDI context factory.  public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";  // Defines the JNDI provider url.  public final static String PROVIDER_URL = "t3://localhost:7001/";  // Defines the JMS connection factory for the queue.  public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName";  // Defines the queue, use the queue JNDI name   public final static String QUEUE_JNDI_NAME = "myJMSQueueJNDIName";  private QueueConnectionFactory qconFactory;  private QueueConnection queueConnection;  private QueueSession queueSession;  private QueueSender queueSender;  private Queue queue;  private TextMessage textMessage;  private StreamMessage streamMessage;  private BytesMessage bytesMessage;  private MapMessage mapMessage;  private ObjectMessage objectMessage;    /**   * get the context object.   * 
   * @return context object   * @throws NamingException if operation cannot be performed   */  private static InitialContext getInitialContext() throws NamingException {    Hashtable table = new Hashtable();    table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); 
    table.put(Context.PROVIDER_URL, PROVIDER_URL);    InitialContext context = new InitialContext(table);    return context;  }  /**   * Creates all the necessary objects for sending messages to a JMS queue.   * 
   * @param ctx JNDI initial context   * @param queueName name of queue   * @exception NamingException if operation cannot be performed   * @exception JMSException if JMS fails to initialize due to internal error   */  public void init(Context ctx, String queueName) throws NamingException, JMSException {    qconFactory = (QueueConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME);    queueConnection = qconFactory.createQueueConnection();    queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);    queue = (Queue) ctx.lookup(queueName);    queueSender = queueSession.createSender(queue);    textMessage = queueSession.createTextMessage();    streamMessage = queueSession.createStreamMessage();    bytesMessage = queueSession.createBytesMessage();    mapMessage = queueSession.createMapMessage();    objectMessage = queueSession.createObjectMessage();    queueConnection.start();  }  /**   * Sends a message to a JMS queue.   * 
   * @param message message to be sent   * @exception JMSException if JMS fails to send message due to internal error   */  public void send(String message) throws JMSException {    // type1: set TextMessage    textMessage.setText(message);        // type2: set StreamMessage    streamMessage.writeString(message);    streamMessage.writeInt(20);    // type3: set BytesMessage    byte[] block = message.getBytes();    bytesMessage.writeBytes(block);    // type4: set MapMessage    mapMessage.setString("name", message);    // type5: set ObjectMessage    User user = new User();    user.setName(message);    user.setAge(30);    objectMessage.setObject(user);        queueSender.send(objectMessage);  }  /**   * read the msg from the console, then send it.   * 
   * @param msgSender   * @throws IOException if IO fails to send message due to internal error   * @throws JMSException if JMS fails to send message due to internal error   */  private static void readAndSend(QueueMsgSender msgSender) throws IOException, JMSException {    BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in));    System.out.println("Enter message(input quit to quit):");  
    String line = null;    boolean quit = false; 
    do {      line = msgStream.readLine();      if (line != null && line.trim().length() != 0) { 
        msgSender.send(line);        System.out.println("JMS Message Sent: " + line + "\n");        quit = line.equalsIgnoreCase("quit");      }    } while (!quit);  }    /**   * release resources.   * 
   * @exception JMSException if JMS fails to close objects due to internal error   */  public void close() throws JMSException {    queueSender.close();    queueSession.close();    queueConnection.close();  }    /**   * test client.   * 
   * @param args   * @throws Exception 
   */  public static void main(String[] args) throws Exception {    InitialContext ctx = getInitialContext(); 
    QueueMsgSender sender = new QueueMsgSender();  
    sender.init(ctx, QUEUE_JNDI_NAME);    readAndSend(sender);    sender.close();  }
}

③. 消费者QueueMsgReceiver

public class QueueMsgReceiver implements MessageListener {  // Defines the JNDI context factory.  public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";  // Defines the JNDI provider url.  public final static String PROVIDER_URL = "t3://localhost:7001";  // Defines the JMS connection factory for the queue.  public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName";  // Defines the queue, use the queue JNDI name   public final static String QUEUE_JNDI_NAME = "myJMSQueueJNDIName";  private QueueConnectionFactory qconFactory;  private QueueConnection queueConnection;  private QueueSession queueSession;  private QueueReceiver queueReceiver;  private Queue queue;  private boolean quit = false;    /**   * get the context object.   * 
   * @return context object   * @throws NamingException if operation cannot be performed   */  private static InitialContext getInitialContext() throws NamingException {    Hashtable table = new Hashtable();    table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);    table.put(Context.PROVIDER_URL, PROVIDER_URL);    InitialContext context = new InitialContext(table);    return context;  }    /**   * Creates all the necessary objects for receiving messages from a JMS queue.   * 
   * @param ctx JNDI initial context   * @param queueName name of queue   * @exception NamingException if operation cannot be performed   * @exception JMSException if JMS fails to initialize due to internal error   */  public void init(Context ctx, String queueName) throws NamingException, JMSException {    qconFactory = (QueueConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME);    queueConnection = qconFactory.createQueueConnection(); 
    queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);    queue = (Queue) ctx.lookup(queueName);    queueReceiver = queueSession.createReceiver(queue); 
    queueReceiver.setMessageListener(this);        // second thread: message reveive thread.    queueConnection.start();  
  }  /**   * implement from MessageListener.   * when a message arrived, it will be invoked.   * 
   * @param message message   */  public void onMessage(Message message) {    try {      String msgStr = "";  
      int age = 0; 

      if (message instanceof TextMessage) {        msgStr = ((TextMessage) message).getText();      } else if (message instanceof StreamMessage) {        msgStr = ((StreamMessage) message).readString();        age = ((StreamMessage) message).readInt();      } else if (message instanceof BytesMessage) {        byte[] block = new byte[1024];        ((BytesMessage) message).readBytes(block);        msgStr = String.valueOf(block);      } else if (message instanceof MapMessage) {        msgStr = ((MapMessage) message).getString("name");      } else if (message instanceof ObjectMessage) {        User user = (User) ((ObjectMessage) message).getObject();        msgStr = user.getName(); 
        age = user.getAge();      }      System.out.println("Message Received: " + msgStr + ", " + age);      if (msgStr.equalsIgnoreCase("quit")) {        synchronized (this) {          quit = true;          this.notifyAll(); // Notify main thread to quit        }      }    } catch (JMSException e) {      throw new RuntimeException("error happens", e);    }  }  /**   * release resources.   * 
   * @exception JMSException if JMS fails to close objects due to internal error   */  public void close() throws JMSException {    queueReceiver.close();    queueSession.close();    queueConnection.close();  }  /**   * test client.   * first thread(main thread)   * 
   * @param args   * @throws Exception 
   */  public static void main(String[] args) throws Exception {    InitialContext ctx = getInitialContext();    QueueMsgReceiver receiver = new QueueMsgReceiver(); 
    receiver.init(ctx, QUEUE_JNDI_NAME);    // Wait until a "quit" message has been received.    synchronized (receiver) {      while (!receiver.quit) {        try {          receiver.wait();        } catch (InterruptedException e) { 
          throw new RuntimeException("error happens", e);        }      }    }    receiver.close();  }
}

2. 测试publisher and subscriber - 发布订阅模式.

    生产者发送一条消息到消息服务器,  消息服务器发送给正在监听的所有消费者,  类似广播.

①. 生产者TopicMsgPublisher

public class TopicMsgPublisher {  // Defines the JNDI context factory.  public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";  // Defines the JNDI provider url.  public final static String PROVIDER_URL = "t3://localhost:7001/";  // Defines the JMS connection factory for the topic.  public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName";  // Defines the topic, use the topic JNDI name   public final static String TOPIC_JNDI_NAME = "myJMSTopicJNDIName"; 

  private TopicConnectionFactory tconFactory;  private TopicConnection topicConnection;  private TopicSession topicSession;  private TopicPublisher topicPublisher; 
  private Topic topic; 
  private TextMessage textMessage;  private StreamMessage streamMessage;  private BytesMessage bytesMessage;  private MapMessage mapMessage;  private ObjectMessage objectMessage;    /**   * get the context object.   * 
   * @return context object   * @throws NamingException if operation cannot be performed   */  private static InitialContext getInitialContext() throws NamingException {    Hashtable table = new Hashtable();    table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); 
    table.put(Context.PROVIDER_URL, PROVIDER_URL);    InitialContext context = new InitialContext(table);    return context;  }  /**   * Creates all the necessary objects for sending messages to a JMS topic.   * 
   * @param ctx JNDI initial context   * @param topicName name of topic   * @exception NamingException if operation cannot be performed   * @exception JMSException if JMS fails to initialize due to internal error   */  public void init(Context ctx, String topicName) throws NamingException, JMSException {    tconFactory = (TopicConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME);    topicConnection = tconFactory.createTopicConnection();    topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);    topic = (Topic) ctx.lookup(topicName);    topicPublisher  = topicSession.createPublisher(topic);    textMessage = topicSession.createTextMessage();    streamMessage = topicSession.createStreamMessage();    bytesMessage = topicSession.createBytesMessage();    mapMessage = topicSession.createMapMessage();    objectMessage = topicSession.createObjectMessage();    topicConnection.start();  }  /**   * Sends a message to a JMS topic.   * 
   * @param message message to be sent   * @exception JMSException if JMS fails to send message due to internal error   */  public void send(String message) throws JMSException {    // type1: set TextMessage    textMessage.setText(message);        // type2: set StreamMessage    streamMessage.writeString(message);    streamMessage.writeInt(20);    // type3: set BytesMessage    byte[] block = message.getBytes();    bytesMessage.writeBytes(block);    // type4: set MapMessage    mapMessage.setString("name", message);    // type5: set ObjectMessage    User user = new User();    user.setName(message);    user.setAge(30);    objectMessage.setObject(user);        topicPublisher.publish(objectMessage);  }  /**   * read the msg from the console, then send it.   * 
   * @param msgPublisher   * @throws IOException if IO fails to send message due to internal error   * @throws JMSException if JMS fails to send message due to internal error   */  private static void readAndSend(TopicMsgPublisher msgPublisher) throws IOException, JMSException { 
    BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in));    System.out.println("Enter message(input quit to quit):");  
    String line = null;    boolean quit = false; 
    do {      line = msgStream.readLine();      if (line != null && line.trim().length() != 0) { 
        msgPublisher.send(line);        System.out.println("JMS Message Sent: " + line + "\n");        quit = line.equalsIgnoreCase("quit");      }    } while (!quit);  }    /**   * release resources.   * 
   * @exception JMSException if JMS fails to close objects due to internal error   */  public void close() throws JMSException {    topicPublisher.close();    topicSession.close();    topicConnection.close();  }    /**   * test client.   * 
   * @param args   * @throws Exception 
   */  public static void main(String[] args) throws Exception {    InitialContext ctx = getInitialContext(); 
    TopicMsgPublisher publisher = new TopicMsgPublisher();  
    publisher.init(ctx, TOPIC_JNDI_NAME); 
    readAndSend(publisher);    publisher.close();  }
}

②. 消费者TopicMsgSubscriber

public class TopicMsgSubscriber implements MessageListener {  // Defines the JNDI context factory.  public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";  // Defines the JNDI provider url.  public final static String PROVIDER_URL = "t3://localhost:7001";  // Defines the JMS connection factory for the topic.  public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName";  // Defines the topic, use the topic JNDI name   public final static String TOPIC_JNDI_NAME = "myJMSTopicJNDIName";  private TopicConnectionFactory tconFactory;  private TopicConnection topicConnection;  private TopicSession topicSession;  private TopicSubscriber topicSubscriber;  private Topic topic; 
  private boolean quit = false;    /**   * get the context object.   * 
   * @return context object   * @throws NamingException if operation cannot be performed   */  private static InitialContext getInitialContext() throws NamingException {    Hashtable table = new Hashtable();    table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);    table.put(Context.PROVIDER_URL, PROVIDER_URL);    InitialContext context = new InitialContext(table);    return context;  }    /**   * Creates all the necessary objects for receiving messages from a JMS topic.   * 
   * @param ctx JNDI initial context   * @param topicName name of topic   * @exception NamingException if operation cannot be performed   * @exception JMSException if JMS fails to initialize due to internal error   */  public void init(Context ctx, String topicName) throws NamingException, JMSException {    tconFactory = (TopicConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME);    topicConnection = tconFactory.createTopicConnection(); 
    topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);    topic = (Topic) ctx.lookup(topicName);    topicSubscriber = topicSession.createSubscriber(topic); 
    topicSubscriber.setMessageListener(this);        // second thread: message reveive thread.    topicConnection.start();  
  }  /**   * implement from MessageListener.   * when a message arrived, it will be invoked.   * 
   * @param message message   */  public void onMessage(Message message) {    try {      String msgStr = "";  
      int age = 0; 

      if (message instanceof TextMessage) {        msgStr = ((TextMessage) message).getText();      } else if (message instanceof StreamMessage) {        msgStr = ((StreamMessage) message).readString();        age = ((StreamMessage) message).readInt();      } else if (message instanceof BytesMessage) {        byte[] block = new byte[1024];        ((BytesMessage) message).readBytes(block);        msgStr = String.valueOf(block);      } else if (message instanceof MapMessage) {        msgStr = ((MapMessage) message).getString("name");      } else if (message instanceof ObjectMessage) {        User user = (User) ((ObjectMessage) message).getObject();        msgStr = user.getName(); 
        age = user.getAge();      }      System.out.println("Message subscribed: " + msgStr + ", " + age);      if (msgStr.equalsIgnoreCase("quit")) {        synchronized (this) {          quit = true;          this.notifyAll(); // Notify main thread to quit        }      }    } catch (JMSException e) {      throw new RuntimeException("error happens", e);    }  }  /**   * release resources.   * 
   * @exception JMSException if JMS fails to close objects due to internal error   */  public void close() throws JMSException {    topicSubscriber.close();    topicSession.close();    topicConnection.close();  }  /**   * test client.   * first thread(main thread)   * 
   * @param args   * @throws Exception 
   */  public static void main(String[] args) throws Exception {    InitialContext ctx = getInitialContext();    TopicMsgSubscriber subscriber = new TopicMsgSubscriber(); 
    subscriber.init(ctx, TOPIC_JNDI_NAME);    // Wait until a "quit" message has been subscribed.    synchronized (subscriber) {      while (!subscriber.quit) {        try {          subscriber.wait();        } catch (InterruptedException e) { 
          throw new RuntimeException("error happens", e);        }      }    }    subscriber.close();  }
}





赞(0)    操作        顶端 
filogra
注册用户
等级:少校
经验:1408
发帖:13
精华:0
注册:2015-6-2
状态:离线
发送短消息息给filogra 加好友    发送短消息息给filogra 发消息
发表于: IP:您无权察看 2016-5-17 7:32:09 | [全部帖] [楼主帖] 2  楼

很强大,感谢分享~



赞(0)    操作        顶端 
山友木樨
注册用户
等级:少校
经验:1040
发帖:9
精华:0
注册:2015-6-1
状态:离线
发送短消息息给山友木樨 加好友    发送短消息息给山友木樨 发消息
发表于: IP:您无权察看 2016-5-17 8:51:09 | [全部帖] [楼主帖] 3  楼



赞(0)    操作        顶端 
总帖数
3
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论