资讯专栏INFORMATION COLUMN

慕课网_《RabbitMQ消息中间件极速入门与实战》学习总结

mykurisu / 1718人阅读

摘要:慕课网消息中间件极速入门与实战学习总结时间年月日星期三说明本文部分内容均来自慕课网。

慕课网《RabbitMQ消息中间件极速入门与实战》学习总结

时间:2018年09月05日星期三

说明:本文部分内容均来自慕课网。@慕课网:https://www.imooc.com

教学源码:无

学习源码:https://github.com/zccodere/s...

第一章:RabbitMQ起步 1-1 课程导航

课程导航

RabbitMQ简介及AMQP协议

RabbitMQ安装与使用

RabbitMQ核心概念

与SpringBoot整合

保障100%的消息可靠性投递方案落地实现

1-2 RabbitMQ简介

初识RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器

用来通过普通协议在完全不同的应用之间共享数据

RabbitMQ是使用Erlang语言来编写的

并且RabbitMQ是基于AMQP协议的

RabbitMQ简介

目前很多互联网大厂都在使用RabbitMQ

RabbitMQ底层采用Erlang语言进行编写

开源、性能优秀,稳定性保障

与SpringAMQP完美的整合、API丰富

集群模式丰富,表达式配置,HA模式,镜像队列模型

保证数据不丢失的前提做到高可靠性、可用性

AMQP全称:Advanced Message Queuing Protocol

AMQP翻译:高级消息队列协议

AMQP协议模型

1-3 RabbitMQ安装

学习笔记

0.安装准备
官网地址:http://www.rabbitmq.com/
安装Linux必要依赖包
下载RabbitMQ安装包
进行安装,修改相关配置文件
vim /etc/hostname
vim /etc/hosts

1.安装Erlang
wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb
sudo dpkg -i erlang-solutions_1.0_all.deb
sudo apt-get install erlang erlang-nox

2.安装RabbitMQ
echo "deb http://www.rabbitmq.com/debian/ testing main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get install rabbitmq-server

3.安装RabbitMQ web管理插件
sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server
访问:http://localhost:15672
默认用户名密码:guest/guest

4.修改RabbitMQ配置
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/ebin/rabbit.app
比如修改密码、配置等等;例如:loopback_users中的<<"guest">>,只保留guest
服务启动:rabbitmq-server start &
服务停止:rabbitmqctl app_stop
1-4 RabbitMQ概念

RabbitMQ的整体架构

RabbitMQ核心概念

Server:又称Broker,接受客户端的连接,实现AMQP实体服务

Connection:连接,应用程序与Broker的网络连接

Channel:网络信道

几乎所有的操作都在Channel中进行
Channel是进行消息读写的通道
客户端可建立多个Channel
每个Channel代表一个会话任务

Message:消息

服务器和应用程序之间传送的数据,由Properties和Body组成
Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性
Body则就是消息体内容

Virtual host:虚拟机

用于进行逻辑隔离,最上层的消息路由
一个Virtual host里面可以有若干个Exchange和Queue
同一个Virtual host里面不能有相同名称的Exchange或Queue

Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列

Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key

Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息

Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

RabbitMQ消息的流转过程

第二章:RabbitMQ使用 2-1 发送消息

SpringBoot与RabbitMQ集成

引入相关依赖

对application.properties进行配置

创建名为rabbitmq-producer的maven工程pom如下



    
        47-rabbitmq
        com.myimooc
        1.0-SNAPSHOT
    
    4.0.0

    rabbitmq-producer

    
        2.0.4.RELEASE
    

    
        
            
                org.springframework.boot
                spring-boot-parent
                ${spring.boot.version}
                pom
                import
            
        
    

    
        
            org.springframework.boot
            spring-boot-starter
        

        
        
            org.springframework.boot
            spring-boot-starter-amqp
        

        
        
            org.apache.commons
            commons-lang3
        
        
            commons-io
            commons-io
            2.5
        
        
            com.alibaba
            fastjson
            1.2.36
        
        
            javax.servlet
            javax.servlet-api
            provided
        
        
            org.slf4j
            slf4j-api
        
        
            log4j
            log4j
            1.2.17
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

1.编写Order类

package com.myimooc.rabbitmq.entity;

import java.io.Serializable;

/**
 * 
* 标题: 订单实体
* 描述: 订单实体
* 时间: 2018/09/06
* * @author zc */ public class Order implements Serializable{ private static final long serialVersionUID = 6771608755338249746L; private String id; private String name; /** * 存储消息发送的唯一标识 */ private String messageId; public Order() { } public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } @Override public String toString() { return "Order{" + "id="" + id + """ + ", name="" + name + """ + ", messageId="" + messageId + """ + "}"; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }

2.编写OrderSender类

package com.myimooc.rabbitmq.producer.producer;

import com.myimooc.rabbitmq.entity.Order;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 
* 标题: 订单消息发送者
* 描述: 订单消息发送者
* 时间: 2018/09/06
* * @author zc */ @Component public class OrderSender { private RabbitTemplate rabbitTemplate; @Autowired public OrderSender( RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } /** * 发送订单 * * @param order 订单 * @throws Exception 异常 */ public void send(Order order) throws Exception { CorrelationData correlationData = new CorrelationData(); correlationData.setId(order.getMessageId()); // exchange:交换机 // routingKey:路由键 // message:消息体内容 // correlationData:消息唯一ID this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData); } }

3.编写application.properties类

# RabbitMQ配置
spring.rabbitmq.addresses=192.168.0.105:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# Server配置
server.servlet.context-path=/
server.port=8080

spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

4.编写Application类

package com.myimooc.rabbitmq.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 
* 标题: 启动类
* 描述: 启动类
* 时间: 2018/09/06
* * @author zc */ @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }

5.编写OrderSenderTest类

package com.myimooc.rabbitmq.producer.producer;

import com.myimooc.rabbitmq.entity.Order;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

/**
 * 
* 标题: 订单消息发送者测试
* 描述: 订单消息发送者测试
* 时间: 2018/09/06
* * @author zc */ @RunWith(SpringRunner.class) @SpringBootTest public class OrderSenderTest { @Autowired private OrderSender orderSender; @Test public void testSend1() throws Exception { Order order = new Order(); order.setId("201809062009010001"); order.setName("测试订单1"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-","")); this.orderSender.send(order); } }
2-2 处理消息

创建名为rabbitmq-consumer的maven工程pom如下



    
        47-rabbitmq
        com.myimooc
        1.0-SNAPSHOT
    
    4.0.0

    rabbitmq-consumer

    
        2.0.4.RELEASE
    

    
        
            
                org.springframework.boot
                spring-boot-parent
                ${spring.boot.version}
                pom
                import
            
        
    

    
        
            org.springframework.boot
            spring-boot-starter
        

        
        
            org.springframework.boot
            spring-boot-starter-amqp
        

        
        
            org.apache.commons
            commons-lang3
        
        
            commons-io
            commons-io
            2.5
        
        
            com.alibaba
            fastjson
            1.2.36
        
        
            javax.servlet
            javax.servlet-api
            provided
        
        
            org.slf4j
            slf4j-api
        
        
            log4j
            log4j
            1.2.17
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

1.编写Order类

package com.myimooc.rabbitmq.entity;

import java.io.Serializable;

/**
 * 
* 标题: 订单实体
* 描述: 订单实体
* 时间: 2018/09/06
* * @author zc */ public class Order implements Serializable{ private static final long serialVersionUID = 6771608755338249746L; private String id; private String name; /** * 存储消息发送的唯一标识 */ private String messageId; public Order() { } public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } @Override public String toString() { return "Order{" + "id="" + id + """ + ", name="" + name + """ + ", messageId="" + messageId + """ + "}"; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }

2.编写OrderReceiver类

package com.myimooc.rabbitmq.consumer.consumer;

import com.rabbitmq.client.Channel;
import com.myimooc.rabbitmq.entity.Order;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 
* 标题: 订单接收者
* 描述: 订单接收者
* 时间: 2018/09/06
* * @author zc */ @Component public class OrderReceiver { /** * 接收消息 * * @param order 消息体内容 * @param headers 消息头内容 * @param channel 网络信道 * @throws Exception 异常 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue",durable = "true"), exchange = @Exchange(name = "order-exchange",type = "topic"), key = "order.*" )) @RabbitHandler public void onOrderMessage(@Payload Order order, @Headers Map headers, Channel channel) throws Exception { // 消费者操作 System.out.println("收到消息:"); System.out.println("订单信息:" + order.toString()); // 手动签收消息 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } }

3.编写application.properties类

# RabbitMQ连接配置
spring.rabbitmq.addresses=192.168.0.105:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
# RabbitMQ消费配置
# 基本并发:5
spring.rabbitmq.listener.simple.concurrency=5
# 最大并发:10
spring.rabbitmq.listener.simple.max-concurrency=10
# 签收模式:手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 限流策略:同一时间只有1条消息发送过来消费
spring.rabbitmq.listener.simple.prefetch=1

# Server配置
server.servlet.context-path=/
server.port=8082

spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

4.编写Application类

package com.myimooc.rabbitmq.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 
* 标题: 启动类
* 描述: 启动类
* 时间: 2018/09/06
* * @author zc */ @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
第三章:可靠性投递 3-1 设计方案

保障100%消息投递成功设计方案(一)

3-2 代码详解

因篇幅限制,源码请到github地址查看,这里仅展示核心关键类

1.编写OrderSender类

package com.myimooc.rabbitmq.ha.producer;

import com.myimooc.rabbitmq.entity.Order;
import com.myimooc.rabbitmq.ha.constant.Constants;
import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper;
import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 
* 标题: 订单消息发送者
* 描述: 订单消息发送者
* 时间: 2018/09/06
* * @author zc */ @Component public class OrderSender { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; /** * 回调方法:confirm确认 */ private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("correlationData:" + correlationData); String messageId = correlationData.getId(); if (ack) { // 如果confirm返回成功,则进行更新 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(messageId); messageLogPO.setStatus(Constants.OrderSendStatus.SEND_SUCCESS); brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO); } else { // 失败则进行具体的后续操作:重试或者补偿等 System.out.println("异常处理..."); } } }; /** * 发送订单 * * @param order 订单 */ public void send(Order order) { // 设置回调方法 this.rabbitTemplate.setConfirmCallback(confirmCallback); // 消息ID CorrelationData correlationData = new CorrelationData(order.getMessageId()); // 发送消息 this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData); } }

2.编写OrderService类

package com.myimooc.rabbitmq.ha.service;

import com.myimooc.rabbitmq.entity.Order;
import com.myimooc.rabbitmq.ha.constant.Constants;
import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper;
import com.myimooc.rabbitmq.ha.dao.mapper.OrderMapper;
import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO;
import com.myimooc.rabbitmq.ha.producer.OrderSender;
import com.myimooc.rabbitmq.ha.util.FastJsonConvertUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * 
* 标题: 订单服务
* 描述: 订单服务
* 时间: 2018/09/07
* * @author zc */ @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; @Autowired private OrderSender orderSender; /** * 创建订单 * * @param order 订单 */ public void create(Order order) { // 当前时间 Date orderTime = new Date(); // 业务数据入库 this.orderMapper.insert(order); // 消息日志入库 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(order.getMessageId()); messageLogPO.setMessage(FastJsonConvertUtils.convertObjectToJson(order)); messageLogPO.setTryCount(0); messageLogPO.setStatus(Constants.OrderSendStatus.SENDING); messageLogPO.setNextRetry(DateUtils.addMinutes(orderTime, Constants.ORDER_TIMEOUT)); this.brokerMessageLogMapper.insert(messageLogPO); // 发送消息 this.orderSender.send(order); } }

3.编写RetryMessageTask类

package com.myimooc.rabbitmq.ha.task;

import com.myimooc.rabbitmq.entity.Order;
import com.myimooc.rabbitmq.ha.constant.Constants;
import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper;
import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO;
import com.myimooc.rabbitmq.ha.producer.OrderSender;
import com.myimooc.rabbitmq.ha.util.FastJsonConvertUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 
* 标题: 重发消息定时任务
* 描述: 重发消息定时任务
* 时间: 2018/09/07
* * @author zc */ @Component public class RetryMessageTask { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private OrderSender orderSender; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; /** * 启动完成3秒后开始执行,每隔10秒执行一次 */ @Scheduled(initialDelay = 3000, fixedDelay = 10000) public void retrySend() { logger.debug("重发消息定时任务开始"); // 查询 status = 0 和 timeout 的消息日志 List pos = this.brokerMessageLogMapper.listSendFailureAndTimeoutMessage(); for (BrokerMessageLogPO po : pos) { logger.debug("处理消息日志:{}",po); if (po.getTryCount() >= Constants.MAX_RETRY_COUNT) { // 更新状态为失败 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(po.getMessageId()); messageLogPO.setStatus(Constants.OrderSendStatus.SEND_FAILURE); this.brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO); } else { // 进行重试,重试次数+1 this.brokerMessageLogMapper.updateRetryCount(po); Order reSendOrder = FastJsonConvertUtils.convertJsonToObject(po.getMessage(), Order.class); try { this.orderSender.send(reSendOrder); } catch (Exception ex) { // 异常处理 logger.error("消息发送异常:{}", ex); } } } logger.debug("重发消息定时任务结束"); } }

4.编写ApplicationTest类

package com.myimooc.rabbitmq.ha;

import com.myimooc.rabbitmq.entity.Order;
import com.myimooc.rabbitmq.ha.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

/**
 * 
* 标题: 订单创建测试
* 描述: 订单创建测试
* 时间: 2018/09/07
* * @author zc */ @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTest { @Autowired private OrderService orderService; @Test public void testCreateOrder(){ Order order = new Order(); order.setId(String.valueOf(System.currentTimeMillis())); order.setName("测试创建订单"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-","")); this.orderService.create(order); } }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77035.html

相关文章

  • 课网_《Java消息间件学习总结

    摘要:时间年月日星期六说明本文部分内容均来自慕课网。这个时候,可以启动多台积分系统,来同时消费这个消息中间件里面的登录消息,达到横向扩展的作用。 时间:2017年07月22日星期六说明:本文部分内容均来自慕课网。@慕课网:http://www.imooc.com教学源码:无学习源码:https://github.com/zccodere/s... 第一章:课程介绍 1-1 课程安排 Java...

    twohappy 评论0 收藏0
  • 课网_《Netty入门之WebSocket初体验》学习总结

    时间:2018年04月11日星期三 说明:本文部分内容均来自慕课网。@慕课网:https://www.imooc.com 教学源码:https://github.com/zccodere/s... 学习源码:https://github.com/zccodere/s... 第一章:课程介绍 1-1 课程介绍 什么是Netty 高性能、事件驱动、异步非阻塞的IO Java开源框架 基于NIO的客户...

    Noodles 评论0 收藏0
  • 课网_《Docker入门学习总结

    摘要:时间年月日星期六说明本文部分内容均来自慕课网。必填用于执行命令,当执行完毕后,将产生一个新的文件层。可选指定此镜像启动时默认执行命令。可选用于指定需要暴露的网络端口号。可选向镜像中挂载一个卷组。 时间:2017年09月16日星期六说明:本文部分内容均来自慕课网。@慕课网:http://www.imooc.com 教学源码:无 学习源码:无 第一章:课程简介 1-1 课程介绍 Docke...

    CoorChice 评论0 收藏0
  • 课网_《Spring入门篇》学习总结

    摘要:入门篇学习总结时间年月日星期三说明本文部分内容均来自慕课网。主要的功能是日志记录,性能统计,安全控制,事务处理,异常处理等等。 《Spring入门篇》学习总结 时间:2017年1月18日星期三说明:本文部分内容均来自慕课网。@慕课网:http://www.imooc.com教学示例源码:https://github.com/zccodere/s...个人学习源码:https://git...

    Ververica 评论0 收藏0

发表评论

0条评论

mykurisu

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<