资讯专栏INFORMATION COLUMN

基于JMS(Java Message Service)的消息中间件----ActiveMQ

JessYanCoding / 2086人阅读

摘要:而中的消息中间件则是在常见的消息中间件类型无疑是不错的选择。是在之间传递的消息的对象。基本功能是用于和面向消息的中间件相互通信的应用程序接口。支持两种消息发送和接收模型。一种称为模型,即采用点对点的方式发送消息。

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。而Java中的消息中间件则是JMS---Java Message Service.在常见的消息中间件类型ActiveMQ无疑是不错的选择。接下来我们先简单介绍一下什么是JMS及ActiveMQ而后我们介绍一下ActiveMQ中的两种模式 -- 队列模型以及主题模式。

JMS简介

JMS基本概念
JMS(Java Message Service)是访问企业消息系统的标准API,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

JMS应用由以下几部分组成:
JMS provider :是一个消息系统,它实现了JMS 接口并提供管理和控制的功能。
JMS clients :是用Java语言写的一些程序和组件,它们产生和使用消息。
Messages :是在JMS clients之间传递的消息的对象。
Administered objects :是由使用JMS clients 的人生成的预选设置好的JMS 对象。有两种这样的对象:
destinations和connection factories。

JMS基本功能
JMS是用于和面向消息的中间件相互通信的应用程序接口。它既支持点对点(point-to-point)的域,又支持发布/订阅 (publish/subscribe)类型的域,并且提供对下列类型的支持:经认可的消息传递,事务型消息的传递,一致性消息和具有持久性的订阅者支 持。JMS还提供了另一种方式来对您的应用与旧的后台系统相集成。

消息服务类型
1) point-to-point (PTP)方式:点到点的模型。消息由一个JMS客户机(发布者)发送到服务器上的一个目的地,即一个队列(queue)。而另一个JMS客户机(订阅者)则可以访问这个队列,并从该服务器获取这条消息。
2) publish/subscribe (pub/sub)方式:发布-订阅模型。这里仍然是由一个JMS客户机将一条消息发布到服务器上的一个目的地上,但是这次这个目的地叫做一个主题 (topic),可有多个订阅者去访问该消息。消息将一直维持在主题中,直到这个主题的所有订阅者都取走了该消息的一个副本。消息也包括了一个参数,用于 定义了该消息的耐久性(它能够在服务器上等待订阅者多长时间)。

ActiceMQ简介:
    ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。

JMS支持两种消息发送和接收模型。一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。下面两张图来描述一下这两种模型:

Java实现ActiveMQ两种模型

【1】队列模型

队列模型之生产者端

package com.imooc.jms.queue;
import java.awt.font.TextMeasurer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class AppProducer {

   private static final String url = "tcp://192.168.133.1:61616";
   private static final String queueName = "queue-test";
   
   public static void main(String[] args) throws JMSException {
       
       //1.创建connectionFactory
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       
       //2.创建connection
       Connection connection = connectionFactory.createConnection();
       
       //3.  启动链接
       connection.start();
       
       //4. 创建会话
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       
       //5.  创建目标
       Destination destination =  session.createQueue(queueName);
       
       //6.  创建生产者向目标发送消息
       MessageProducer  producer=  session.createProducer(destination);
       
       for(int i = 0 ;i<100;i++){
           //7.  创建消息
           TextMessage textMessage = session.createTextMessage("test"+i);
           //8.  发布消息
           producer.send(textMessage);
           System.out.println("发送消息:"+textMessage.getText());
       }    
       //9. 关闭连接
       connection.close();
   }

}

说明:此为队列模型中的生产者端,下面就该端代码进行说明:
new 一个ConnectionFactory --》 new 一个connection ---》启动这个连接----》创建一个会话----》创建消息发送目的地---》创建一个消息生产者向目的地发送消息--》发送消息----》关闭连接

队列模型之消费者端

package com.imooc.jms.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class AppConsumer {

   private static final String url = "tcp://192.168.133.1:61616";
   private static final String queueName = "queue-test";
   
   public static void main(String[] args) throws JMSException {
       
       //1.创建connectionFactory
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       
       //2.创建connection
       Connection connection = connectionFactory.createConnection();
       
       //3.  启动链接
       connection.start();
       
       //4. 创建会话
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       
       //5.  创建目标
       Destination destination =  session.createQueue(queueName);
       
      //6.  创建一个消费者
       MessageConsumer consumer =  session.createConsumer(destination);
       
       //7.  创建一个监听器
       consumer.setMessageListener(new MessageListener() {
           
           public void onMessage(Message message) {
               
               TextMessage textMessage = (TextMessage)message;
               try {
                   System.out.println("接受消息:"+textMessage.getText());
               } catch (JMSException e) {
                   // TODO Auto-generated catch block
                   e.printStackTrace();
               }
               
           }
       });
       
       //9. 关闭连接
       //connection.close();
   }

}

说明:此为队列模型中的消费者端,下面就该端代码进行说明:
new 一个ConnectionFactory --》 new 一个connection ---》启动这个连接----》创建一个会话----》创建消息发送目的地---》创建一个消息消费者--》创建一个消息监听器监听消息----》关闭连接

【2】主题模型

主题模型之生产者端

package com.imooc.jms.topic;
import java.awt.font.TextMeasurer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class AppProducer {

   private static final String url = "tcp://192.168.133.1:61616";
   private static final String topicName = "topic-test";
   
   public static void main(String[] args) throws JMSException {
       
       //1.创建connectionFactory
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       
       //2.创建connection
       Connection connection = connectionFactory.createConnection();
       
       //3.  启动链接
       connection.start();
       
       //4. 创建会话
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       
       //5.  创建目标
       Destination destination =  session.createTopic(topicName);
       
       //6.  创建生产者向目标发送消息
       MessageProducer  producer=  session.createProducer(destination);
       
       for(int i = 0 ;i<100;i++){
           //7.  创建消息
           TextMessage textMessage = session.createTextMessage("test"+i);
           //8.  发布消息
           producer.send(textMessage);
           System.out.println("发送消息:"+textMessage.getText());
       }
       
       //9. 关闭连接
       connection.close();
   }

}

主题模型之消费者端

package com.imooc.jms.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class AppConsumer {

   private static final String url = "tcp://192.168.133.1:61616";
   private static final String topicName = "topic-test";
   
   public static void main(String[] args) throws JMSException {
       
       //1.创建connectionFactory
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       
       //2.创建connection
       Connection connection = connectionFactory.createConnection();
       
       //3.  启动链接
       connection.start();
       
       //4. 创建会话
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       
       //5.  创建目标
       Destination destination =  session.createTopic(topicName);
       
      //6.  创建一个消费者
       MessageConsumer consumer =  session.createConsumer(destination);
       
       //7.  创建一个监听器
       consumer.setMessageListener(new MessageListener() {
           
           public void onMessage(Message message) {
               
               TextMessage textMessage = (TextMessage)message;
               try {
                   System.out.println("接受消息:"+textMessage.getText());
               } catch (JMSException e) {
                   // TODO Auto-generated catch block
                   e.printStackTrace();
               }
               
           }
       });
       
       //9. 关闭连接
       //connection.close();
   }

}

最后创建项目源码地址:
链接:http://pan.baidu.com/s/1bo1PSib 密码:khyc

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/35878.html

相关文章

  • 慕课网_《Java消息间件》学习总结

    摘要:时间年月日星期六说明本文部分内容均来自慕课网。这个时候,可以启动多台积分系统,来同时消费这个消息中间件里面的登录消息,达到横向扩展的作用。 时间:2017年07月22日星期六说明:本文部分内容均来自慕课网。@慕课网:http://www.imooc.com教学源码:无学习源码:https://github.com/zccodere/s... 第一章:课程介绍 1-1 课程安排 Java...

    twohappy 评论0 收藏0
  • 消息间件ActiveMQ介绍

    摘要:中间件的分类基于远程过程调用的中间件。基于对象请求代理的中间件。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。 一.中间件 1.1 什么是中间件? 由于业务、机构和技术是不断变化的,因此为其服务的软件系统必须适应这样的变化。在合并、添加服务或扩展可用服务之后,公司可能无力负担重新创建信息系统所需的成本。正是在...

    jaysun 评论0 收藏0
  • 新手也能看懂,消息队列其实很简单

    摘要:通过以上分析我们可以得出消息队列具有很好的削峰作用的功能即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。 该文已加入开源项目:JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目,Star 数接近 16k)。地址:https://github.com/Snailclimb... 本文内容思维导图:showImg(ht...

    Clect 评论0 收藏0
  • 消息队列ActiveMQ使用详解

    摘要:学习消息队列的使用之前,我们先来搞清。是操作消息的接口。消息生产者由创建,并用于将消息发送到。接收消息打印结果这是接收到的消息消费者启动。。。。 通过上一篇文章 《消息队列深入解析》,我们已经消息队列是什么、使用消息队列的好处以及常见消息队列的简单介绍。 这一篇文章,主要带大家详细了解一下消息队列ActiveMQ的使用。 学习消息队列ActiveMQ的使用之前,我们先来搞清JMS。 J...

    niceforbear 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<