[原创]jms的topic和queue的简单实现_Tomcat, WebLogic及J2EE讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Tomcat, WebLogic及J2EE讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 2320 | 回复: 0   主题: [原创]jms的topic和queue的简单实现        下一篇 
sanfeng.pan
注册用户
等级:下士
经验:177
发帖:4
精华:0
注册:1970-1-1
状态:离线
发送短消息息给sanfeng.pan 加好友    发送短消息息给sanfeng.pan 发消息
发表于: IP:您无权察看 2016-12-23 13:21:25 | [全部帖] [楼主帖] 楼主

    第一次接触使用jms,参考了论坛大神的代码,现在自己做了一个简单实现案列,首先需要在weblogic建立jms服务,jms模块等等,可以参照论坛大神发的相关帖子。建立好服务后新建一个工程,目录结构如下:

图片.png

    当然需要导入weblogic的jar包,由于经常要用到context上下文对象,建立了一个工具类:

package com.landingbj.util;

import java.util.Properties;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
 * TODO 获取jndi上下文对象工具类
 * @author psf
 *
 */
public class ContextManager {
    private static Context context=null;
    public static Context getContext(){
        if (context==null) {
            Properties p=new Properties();
            p.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory");
            p.put(Context.PROVIDER_URL, "t3://localhost:7001"); 
            try {
                context=new InitialContext(p);
            } catch (NamingException e) {
                e.printStackTrace();
            }
        }
        return context;
    }
}

    其中topic消息生产者代码:

package com.landingbj.topic;

import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;

import com.landingbj.util.ContextManager;
/**
 * TODO jms topic发送信息
 * @author psf
 *
 */
public class SendMessage {
    //文本信息
    private TextMessage message;
    //信息制造者
    private MessageProducer sender;
    
    public static void main(String[] args) {
        SendMessage sendMessage=new SendMessage();
        for (int i = 0; i <10; i++) {
            sendMessage.send("topic mess"+i);
        }
    }
    public SendMessage() {
        Context context=ContextManager.getContext();
        try {
            //从weblogic中获取连接工厂
            ConnectionFactory factory=(ConnectionFactory) context.lookup("myconn");
            //获取连接
            Connection connection=factory.createConnection();
            //创建会话
            Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //从weblogic中的主题获取管理对象Destination
            Destination destination=(Destination) context.lookup("Topic-0");
            // 创建一个消息生产者
            sender=session.createProducer(destination);
            message=session.createTextMessage();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
    public void send(String mess){
            try {
                message.setText(mess);
                sender.send(message);
                message.setText("当前时间:"+new Date().toString());
                sender.send(message);
            } catch (JMSException e) {
                e.printStackTrace();
            };
    }
}

    消息消费者:

package com.landingbj.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;

import com.landingbj.util.ContextManager;
/**
 * TODO 接收topic信息
 * @author psf
 *
 */
public class ReceiveMessage {
    private TextMessage message;
    //消息消费者顶级接口,其子接口有QueueReceiver和TopicSubscriber
    private MessageConsumer receiver;
    public static void main(String[] args) {
        ReceiveMessage receiveMessage=new ReceiveMessage();
        for (int i = 0; i <20; i++) {
            receiveMessage.receive();
        }
        
    }
    public ReceiveMessage() {
        Context context=ContextManager.getContext();
        try {
            ConnectionFactory factory=(ConnectionFactory) context.lookup("myconn");
            Connection connection=factory.createConnection();
            //clientID是JMS server用来唯一标记链接的,因此在全局中不能重复
            connection.setClientID("landingbj");
            //"开启"消息接收,此后即可接收消息;不过对于Producer而言,无论链接处于何种状态,均可以发送消息;此方法主要对消费者有效
            connection.start();
            //创建会话,并指定此Session的事务性和消息确认模式
            //AUTO_ACKNOWLEDGE 消息自动确认,CLIENT_ACKNOWLEDGE:客户端确认,DUPS_OK_ACKNOWLEDGE:"可重复消息确认"
            Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic=(Topic) context.lookup("Topic-0");
            //创建一个“耐久订阅者”,并指定订阅者的名称(name,需要全局唯一)
            receiver=session.createDurableSubscriber(topic, "landingbj");
            message=session.createTextMessage();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (NamingException e) {
            e.printStackTrace();
        }
    }
    
    public void receive(){
        try {
            if(message!=null){
                message=(TextMessage) receiver.receive();
                System.out.println(message.getText());
            }else {
                System.out.println("没有信息可读!");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

    jms队列消息发送

package com.landingbj.queue;

import java.util.Date;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;


import com.landingbj.util.ContextManager;
/**
 * TODO jms队列发送信息
 * @author psf
 *
 */
public class SendMessage {
    private QueueSender sender;
    private TextMessage message;
    
    public static void main(String[] args) {
        SendMessage sendMessage=new SendMessage();
        for (int i = 0; i <5; i++) {
            sendMessage.send("queue mess"+i);
        }
    }
    public SendMessage() {
        Context context=ContextManager.getContext();
        try {
            QueueConnectionFactory queueConnectionFactory=(QueueConnectionFactory) context.lookup("myconn");
            QueueConnection queueConnection=queueConnectionFactory.createQueueConnection();
            QueueSession session=queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            Queue queue=(Queue) context.lookup("Queue-0");
            sender=session.createSender(queue);
            message=session.createTextMessage();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
    public void send(String mess){
        try {
            message.setText(mess);
            sender.send(message);
            message.setText("当前时间"+new Date().toString());
            sender.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        };
    }
}

    消息接收

package com.landingbj.queue;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;

import com.landingbj.util.ContextManager;

public class ReceiveMessage {
    private QueueReceiver receiver;
    private TextMessage message;
    public static void main(String[] args) {
        ReceiveMessage receiveMessage=new ReceiveMessage();
        for (int i = 0; i <10; i++) {
            receiveMessage.receive();
        }
        
    }
    public ReceiveMessage() {
        Context context=ContextManager.getContext();
        try {
            QueueConnectionFactory queueConnectionFactory=(QueueConnectionFactory) context.lookup("myconn");
            QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
            queueConnection.start();
            QueueSession session=queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            Queue queue=(Queue) context.lookup("Queue-0");
            receiver=session.createReceiver(queue);
            message=session.createTextMessage();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (NamingException e) {
            e.printStackTrace();
        }
    }
    
    public void receive(){
        try {
            if(message!=null){
                message=(TextMessage) receiver.receive();
                System.out.println(message.getText());
            }else {
                System.out.println("没有信息可读!");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

    weblogic控制台信息如下:

图片.png

    目前,就写到这里,相关概念可以参考其它相关帖子。




赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论