摘要:前提通过前面两篇文章可以简单的了解和安装,今天就将和整合起来使用。然后我运行之前的整合项目,查看监控信息如下总结整篇文章讲述了与整合和监控平台的搭建。
前提
通过前面两篇文章可以简单的了解 RocketMQ 和 安装 RocketMQ ,今天就将 SpringBoot 和 RocketMQ 整合起来使用。
1、SpringBoot Kafka 整合使用
2、SpringBoot RabbitMQ 整合使用
3、SpringBoot ActiveMQ 整合使用
4、Kafka 安装及快速入门
5、SpringBoot RabbitMQ 整合进阶版
6、RocketMQ 初探
7、RocketMQ 安装及快速入门
关注我转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/02/07/SpringBoot-RocketMQ/
创建项目在 IDEA 创建一个 SpringBoot 项目,项目结构如下:
pom 文件引入 RocketMQ 的一些相关依赖,最后的 pom 文件如下:
配置文件4.0.0 com.zhisheng rocketmq 0.0.1-SNAPSHOT jar rocketmq Demo project for Spring Boot RocketMQ org.springframework.boot spring-boot-starter-parent 1.5.9.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.apache.rocketmq rocketmq-common 4.2.0 org.apache.rocketmq rocketmq-client 4.2.0 org.springframework.boot spring-boot-maven-plugin
application.properties 中如下:
# 消费者的组名 apache.rocketmq.consumer.PushConsumer=PushConsumer # 生产者的组名 apache.rocketmq.producer.producerGroup=Producer # NameServer地址 apache.rocketmq.namesrvAddr=localhost:9876生产者
package com.zhisheng.rocketmq.client; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; import javax.annotation.PostConstruct; /** * Created by zhisheng_tian on 2018/2/6 */ @Component public class RocketMQClient { /** * 生产者的组名 */ @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQProducer() { //生产者的组名 DefaultMQProducer producer = new DefaultMQProducer(producerGroup); //指定NameServer地址,多个地址以 ; 隔开 producer.setNamesrvAddr(namesrvAddr); try { /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可 * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); //创建一个消息实例,包含 topic、tag 和 消息体 //如下:topic 为 "TopicTest",tag 为 "push" Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET)); StopWatch stop = new StopWatch(); stop.start(); for (int i = 0; i < 10000; i++) { SendResult result = producer.send(message); System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); } stop.stop(); System.out.println("----------------发送一万条消息耗时:" + stop.getTotalTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }消费者
package com.zhisheng.rocketmq.server; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * Created by zhisheng_tian on 2018/2/6 */ @Component public class RocketMQServer { /** * 消费者的组名 */ @Value("${apache.rocketmq.consumer.PushConsumer}") private String consumerGroup; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQPushConsumer() { //消费者的组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); //指定NameServer地址,多个地址以 ; 隔开 consumer.setNamesrvAddr(namesrvAddr); try { //订阅PushTopic下Tag为push的消息 consumer.subscribe("TopicTest", "push"); //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 //如果非第一次启动,那么按照上次消费的位置继续消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> { try { for (MessageExt messageExt : list) { System.out.println("messageExt: " + messageExt);//输出消息内容 String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }启动类
package com.zhisheng.rocketmq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RocketmqApplication { public static void main(String[] args) { SpringApplication.run(RocketmqApplication.class, args); } }RocketMQ
代码已经都写好了,接下来我们需要将与 RocketMQ 有关的启动起来。
启动 Name Server在前面文章中已经写过怎么启动,http://www.54tianzhisheng.cn/2018/02/06/RocketMQ-install/#%E5%90%AF%E5%8A%A8-NameServer
进入到目录 :
cd distribution/target/apache-rocketmq
启动:
nohup sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log //通过日志查看是否启动成功启动 Broker
nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log //通过日志查看是否启动成功
然后运行启动类,运行效果如下:
监控RocketMQ有一个对其扩展的开源项目 ocketmq-console ,如今也提交给了 Apache ,地址在:[https://github.com/apache/roc...]() ,官方也给出了其支持的功能的中文文档:[https://github.com/apache/roc...]() , 那么该如何安装?
Docker 安装1、获取 Docker 镜像
docker pull styletang/rocketmq-console-ng
2、运行,注意将你自己的 NameServer 地址替换下面的 127.0.0.1
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng非 Docker 安装
我们 git clone 一份代码到本地:
git clone https://github.com/apache/rocketmq-externals.git cd rocketmq-externals/rocketmq-console/
需要 jdk 1.7 以上。 执行以下命令:
mvn spring-boot:run
或者
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar
注意:
1、如果你下载依赖缓慢,你可以重新设置 maven 的 mirror 为阿里云的镜像
alimaven aliyun maven http://maven.aliyun.com/nexus/content/groups/public/ central
2、如果你使用的 RocketMQ 版本小于 3.5.8,如果您使用 rocketmq < 3.5.8,请在启动 rocketmq-console-ng 时添加 -Dcom.rocketmq.sendMessageWithVIPChannel = false(或者您可以在 ops 页面中更改它)
3、更改 resource / application.properties 中的 rocketmq.config.namesrvAddr(或者可以在ops页面中更改它)
错误解决方法1、Docker 启动项目报错
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to
将 Docker 启动命令改成如下以后:
docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
报错信息改变了,新的报错信息如下:
ERROR op=global_exception_handler_print_error org.apache.rocketmq.console.exception.ServiceException: This date have"t data!
看到网上有人也遇到这个问题,他们都通过自己的方式解决了,但是方法我都试了,不适合我。不得不说,阿里,你能再用心点吗?既然把 RocketMQ 捐给 Apache 了,这些文档啥的都必须更新啊,不要还滞后着呢,不然少不了被吐槽!
搞了很久这种方法没成功,暂时放弃!mmp
2、非 Docker 安装,只好把源码编译打包了。
1) 注意需要修改如下图中的配置:
rocketmq.config.namesrvAddr=localhost:9876 //注意替换你自己的ip #如果你 rocketmq 版本小于 3.5.8 才需设置 `rocketmq.config.isVIPChannel` 为 false,默认是 true, 这个可以在源码中可以看到的 rocketmq.config.isVIPChannel=
2) 执行以下命令:
mvn clean package -Dmaven.test.skip=true
编译成功:
可以看到已经打好了 jar 包:
运行:
java -jar rocketmq-console-ng-1.0.0.jar
成功,不报错了,开心?,访问 http://localhost:8080/
整个监控大概就是这些了。
然后我运行之前的 SpringBoot 整合项目,查看监控信息如下:
总结整篇文章讲述了 SpringBoot 与 RocketMQ 整合和 RocketMQ 监控平台的搭建。
参考文章1、[http://www.ymq.io/2018/02/02/...]()
2、GitHub 官方 README
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/70963.html
摘要:前提好几周没更新博客了,对不断支持我博客的童鞋们说声抱歉了。熟悉我的人都知道我写博客的时间比较早,而且坚持的时间也比较久,一直到现在也是一直保持着更新状态。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好几周没更新博客了,对不断支持我博客的童鞋们说声:抱歉了!。自己这段时...
摘要:什么是是一个管理和监控你的应用程序的应用程序。这些应用程序通过通过注册或者使用例如发现。刚才首页的应用列表后面有个红色的,我们可以将注册上去的应用移除,但是只要你不把程序停掉,它立马又会注册上去。 showImg(http://ww3.sinaimg.cn/large/006tNc79ly1g5h6jqpgs9j30u00gwdhe.jpg); 什么是 SpringBoot Admin...
摘要:背景最近来了个实习僧小弟,安排他实现对目标网站连通性检测的小功能简单讲就是将下边的脚本换成代码来实现百度平台状态不正常,请注意功能实现使用开始执行定时任务,检测百度网站连通性请求百度成功,返回报文请求异常百度执行检测百度网站连通 背景 最近来了个实习僧小弟,安排他实现对目标网站 连通性检测的小功能,简单讲就是将下边的shell 脚本换成Java 代码来实现 1#!/bin/bash ...
摘要:注意一定要亲自自己安装实践,接下来我们将这两个进行整合。创建项目项目整体架构使用创建项目,这个很简单了,这里不做过多的讲解。 showImg(http://ww4.sinaimg.cn/large/006tNc79gy1g5iatph25rj30u00gw0yj.jpg); 前提 假设你了解过 SpringBoot 和 Kafka。 1、SpringBoot 如果对 SpringBoo...
摘要:可以在地址看到如何使用讲解下上面命令行表示控制台端口号,可以在浏览器中通过控制台来执行的相关操作。同时从控制台可以看到发送的速率多线程测试性能开了个线程,每个线程发送条消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次写了篇文章,《SpringBoot Kafka 整合...
阅读 1572·2021-09-22 15:21
阅读 2819·2021-09-09 09:32
阅读 2617·2021-09-02 09:52
阅读 3210·2019-08-30 14:02
阅读 2178·2019-08-26 13:25
阅读 1411·2019-08-26 13:24
阅读 1553·2019-08-26 10:31
阅读 1530·2019-08-26 10:16