资讯专栏INFORMATION COLUMN

RocketMQ知识

shaonbean / 3164人阅读

摘要:基础概念消息总线,是一种跨进程的通信机制,用于上下游传递消息。每一个都会接收到消息。概念标签,用于对消息分类,在的基础上进行更细的划分。概念中负责接收生产者消息给消费者发送消息的组件。

MQ基础概念:

MQ:
消息总线(Message Queue),是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用MQ之后,消息发送上游只需要依赖MQ,逻辑上和物理上都不用依赖其他服务。
MQ的不足
(1)系统更加复杂,多了一个MQ组件
(2)消息传递路径更长,延时会增加
(3)消息可能会被重复消费
(4)上游无法知道下游的执行结果(因此,调用方实时依赖执行结果的业务场景,请使用调用,而不是MQ)

使用场景
(1)上游不关注执行结果
(2)上游关注结果,但执行时间比较长。举个例子,微信支付,跨公网调用微信的接口,执行时间会比较长,但调用方又非常关注执行结果,此时一般怎么玩呢?

一般采用“回调网关+MQ”方案来解耦:

a、调用方直接跨公网调用微信接口
b、微信返回调用成功,此时并不代表返回成功
c、微信执行完成后,回调统一网关
d、网关将返回结果通知MQ
e、请求方收到结果通知

rocketMQ:
RocketMQ 是什么?
Github 上关于 RocketMQ 的介绍:
RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:

支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
支持拉(pull)和推(push)两种消息模式
单一队列百万消息的堆积能力
支持多种消息协议,如 JMS、MQTT 等
分布式高可用的部署架构,满足至少一次消息传递语义
提供 docker 镜像用于隔离测试和云集群部署
提供配置、指标和监控等功能丰富的 Dashboard

consumer group:
1、概念:消费者分组,多个消费者在一个消费者分组中。
2、注意点:一个consumer group中的机器相当于一个集群,consumer group中只有一台机器会接收到消息,并进行消费。每一个consumer group都会接收到消息。这样子的设计要求消费端需要保证幂等性。

topic:
1、概念:Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。
2、生产方发出的消息绑定某个topic,然后消费方监听某个topic,消费方(各个group)接收到消息,进行消费
3、topic应用级别:整个应用最好都使用一个topic,而更加细的区分,使用tags来区分。

tag:
1、概念:标签,用于对消息分类,在topic的基础上进行更细的划分。

nameServer:
1、概念:Name Server 为 producer 和 consumer 提供路由信息。类似rpc中的注册中心。当producer需要发送消息首先去询问nameServer需要请求哪个broker。而当consumer需要拉取消息,也会先询问nameServer需要请求哪个broker。

broker:
1、概念:rocketMQ中负责接收生产者消息、给消费者发送消息的组件。

Message:
1、概念:Message 是消息的载体。一个 Message 必须指定 topic。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker 上的消息,方便在开发过程中诊断问题。

MQ生产者者实例:
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        //声明并初始化一个producer
        //需要一个producer group名字作为构造方法的参数,这里为producer1
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        
        //设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
        //NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
        producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
        
        //调用start()方法启动一个producer实例
        producer.start();

        //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                
                //调用producer的send()方法发送消息
                //这里调用的是同步的方式,所以会有返回结果
                SendResult sendResult = producer.send(msg);
                
                //打印返回结果,可以看到消息发送的状态以及一些相关信息
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        //发送完消息之后,调用shutdown()方法关闭producer
        producer.shutdown();
    }
}
MQ消费者实例:

在开发过程中,如果想测试生产者是否发出了mq,可以编写一个消费者进行测试

@Test
public void testMqConsumer() throws Exception {
    String rocketmqAddress="10.113.41.2:9876;10.113.41.4:9876";

    int threadNum = 5;
    String topics = "WechatUnionCoreTemplateNotifyTopic";
    String instanceName = "TemplateComsumer";
    String groupName = "wechatUnionTemplateNotifyConsumer";
    DefaultMQPushConsumer consumer = null;

    consumer = new DefaultMQPushConsumer(groupName);
    consumer.setNamesrvAddr(rocketmqAddress);//MQ地址
    consumer.setClientCallbackExecutorThreads(threadNum);//消费现场数量
    consumer.setInstanceName(instanceName);//实例名称
    consumer.subscribe(topics, "*");


    //注册监听
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(
                List msgs,
                ConsumeConcurrentlyContext context) {
            for (int i = 0; i < msgs.size(); i++) {
                MessageExt msgExt =  msgs.get(i);
                String msgId = msgExt.getMsgId();
                Integer flag = msgExt.getFlag();
                TemplateNotifyItem templateNotifyItem = ProtoBufSerialize.fromProto(msgExt.getBody(), TemplateNotifyItem.class);
                logger.info("receive new Msg:    " + "  msgId=" + msgId + "   flag=" + flag + "  templateNotifyItem=" + templateNotifyItem);
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    logger.info("监听执行中");



    Thread.sleep(1000000);
}

参考:
http://blog.csdn.net/manzhizh...
https://www.jianshu.com/p/824...
架构师之路

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68163.html

相关文章

  • 写这么多系列博客,怪不得找不到女朋友

    摘要:前提好几周没更新博客了,对不断支持我博客的童鞋们说声抱歉了。熟悉我的人都知道我写博客的时间比较早,而且坚持的时间也比较久,一直到现在也是一直保持着更新状态。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好几周没更新博客了,对不断支持我博客的童鞋们说声:抱歉了!。自己这段时...

    JerryWangSAP 评论0 收藏0
  • 后端必备——数据通信知识(RPC、消息队列)一站式总结

    摘要:具体可以参考消息队列之具体可以参考实战之快速入门十分钟入门阿里中间件团队博客是一个分布式的可分区的可复制的基于发布订阅的消息系统主要用于大数据领域当然在分布式系统中也有应用。目前市面上流行的消息队列就是阿里借鉴的原理用开发而得。 我自己总结的Java学习的系统知识点以及面试问题,目前已经开源,会一直完善下去,欢迎建议和指导欢迎Star: https://github.com/Snail...

    Kahn 评论0 收藏0
  • RocketMQ源码学习(一)-概述

    摘要:每个与集群中的所有节点建立长连接,定时注册信息到所有。完全无状态,可集群部署。本系列源码解析主要参照原理简介来追寻其代码实现虽然版本不太一致但这也是能找到的最详细的资料了接下来根据其模块来源码阅读目录如下 为什么选择读RocketMQ? 对MQ的理解一直不深,上周看了,还是觉得不够深入,找个成熟的产品来学习吧,RabbitMQ是erLang写的,Kafka是Scala写的,非Java写...

    godlong_X 评论0 收藏0
  • 新手也能看懂,消息队列其实很简单

    摘要:通过以上分析我们可以得出消息队列具有很好的削峰作用的功能即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。 该文已加入开源项目:JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目,Star 数接近 16k)。地址:https://github.com/Snailclimb... 本文内容思维导图:showImg(ht...

    Clect 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<