摘要:它通过使用来连接消息代理中间件以实现消息事件驱动的微服务应用。该示例主要目标是构建一个基于的微服务应用,这个微服务应用将通过使用消息中间件来接收消息并将消息打印到日志中。下面我们通过编写生产消息的单元测试用例来完善我们的入门内容。
之前在写Spring Boot基础教程的时候写过一篇《Spring Boot中使用RabbitMQ》。在该文中,我们通过简单的配置和注解就能实现向RabbitMQ中生产和消费消息。实际上我们使用的对RabbitMQ的starter就是通过Spring Cloud Stream中对RabbitMQ的支持来实现的。下面我们就通过本文来了解一下Spring Cloud Stream。
Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot来创建独立的、可用于生产的Spring应用程序。它通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动的微服务应用。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅、消费组以及消息分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。由于Spring Cloud Stream基于Spring Boot实现,所以它秉承了Spring Boot的优点,实现了自动化配置的功能帮忙我们可以快速的上手使用,但是目前为止Spring Cloud Stream只支持下面两个著名的消息中间件的自动化配置:
RabbitMQ
Kafka
快速入门下面我们通过构建一个简单的示例来对Spring Cloud Stream有一个初步认识。该示例主要目标是构建一个基于Spring Boot的微服务应用,这个微服务应用将通过使用消息中间件RabbitMQ来接收消息并将消息打印到日志中。所以,在进行下面步骤之前请先确认已经在本地安装了RabbitMQ,具体安装步骤请参考此文。
构建一个Spring Cloud Stream消费者创建一个基础的Spring Boot工程,命名为:stream-hello
编辑pom.xml中的依赖关系,引入Spring Cloud Stream对RabbitMQ的支持,具体如下:
org.springframework.boot spring-boot-starter-parent 1.5.9.RELEASE org.springframework.boot spring-boot-starter-test test org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.cloud spring-cloud-dependencies Dalston.SR4 pom import
创建用于接收来自RabbitMQ消息的消费者SinkReceiver,具体如下:
@EnableBinding(Sink.class) public class SinkReceiver { private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class); @StreamListener(Sink.INPUT) public void receive(Object payload) { logger.info("Received: " + payload); } }
创建应用主类,这里同其他Spring Boot一样,没有什么特别之处,具体如下:
@SpringBootApplication public class SinkApplication { public static void main(String[] args) { SpringApplication.run(SinkApplication.class, args); } }
到这里,我们快速入门示例的编码任务就已经完成了。下面我们分别启动RabbitMQ以及该Spring Boot应用,然后做下面的试验,看看它们是如何运作的。
手工测试验证我们先来看一下Spring Boot应用的启动日志。
... INFO 16272 --- [main] o.s.c.s.b.r.RabbitMessageChannelBinder : declaring queue for inbound: input.anonymous.Y8VsFILmSC27eS5StsXp6A, bound to: input INFO 16272 --- [main] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@3c78e551 [delegate=amqp://guest@127.0.0.1:5672/] INFO 16272 --- [main] o.s.integration.channel.DirectChannel : Channel "input.anonymous.Y8VsFILmSC27eS5StsXp6A.bridge" has 1 subscriber(s). INFO 16272 --- [main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.Y8VsFILmSC27eS5StsXp6A ...
从上面的日志内容中,我们可以获得以下信息:
使用guest用户创建了一个指向127.0.0.1:5672位置的RabbitMQ连接,在RabbitMQ的控制台中我们也可以发现它。
声明了一个名为input.anonymous.Y8VsFILmSC27eS5StsXp6A的队列,并通过RabbitMessageChannelBinder将自己绑定为它的消费者。这些信息我们也能在RabbitMQ的控制台中发现它们。
下面我们可以在RabbitMQ的控制台中进入input.anonymous.Y8VsFILmSC27eS5StsXp6A队列的管理页面,通过Publish Message功能来发送一条消息到该队列中。
此时,我们可以在当前启动的Spring Boot应用程序的控制台中看到下面的内容:
INFO 16272 --- [C27eS5StsXp6A-1] com.didispace.HelloApplication : Received: [B@7cba610e
我们可以发现在应用控制台中输出的内容就是SinkReceiver中receive方法定义的,而输出的具体内容则是来自消息队列中获取的对象。这里由于我们没有对消息进行序列化,所以输出的只是该对象的引用,在后面的小节中我们会详细介绍接收消息后的处理。
在顺利完成上面快速入门的示例后,我们简单解释一下上面的步骤是如何将我们的Spring Boot应用连接上RabbitMQ来消费消息以实现消息驱动业务逻辑的。
首先,我们对Spring Boot应用做的就是引入spring-cloud-starter-stream-rabbit依赖,该依赖包是Spring Cloud Stream对RabbitMQ支持的封装,其中包含了对RabbitMQ的自动化配置等内容。从下面它定义的依赖关系中,我们还可以知道它等价于spring-cloud-stream-binder-rabbit依赖。
org.springframework.cloud spring-cloud-stream-binder-rabbit
接着,我们再来看看这里用到的几个Spring Cloud Stream的核心注解,它们都被定义在SinkReceiver中:
@EnableBinding,该注解用来指定一个或多个定义了@Input或@Output注解的接口,以此实现对消息通道(Channel)的绑定。在上面的例子中,我们通过@EnableBinding(Sink.class)绑定了Sink接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义,它的源码如下:
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }
它通过@Input注解绑定了一个名为input的通道。除了Sink之外,Spring Cloud Stream还默认实现了绑定output通道的Source接口,还有结合了Sink和Source的Processor接口,实际使用时我们也可以自己通过@Input和@Output注解来定义绑定消息通道的接口。当我们需要为@EnableBinding指定多个接口来绑定消息通道的时候,可以这样定义:@EnableBinding(value = {Sink.class, Source.class})。
@StreamListener:该注解主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。在上面的例子中,我们通过@StreamListener(Sink.INPUT)注解将receive方法注册为对input消息通道的监听处理器,所以当我们在RabbitMQ的控制页面中发布消息的时候,receive方法会做出对应的响应动作。
编写消费消息的单元测试用例上面我们通过RabbitMQ的控制台完成了发送消息来验证了消息消费程序的功能,虽然这种方法比较low,但是通过上面的步骤,相信大家对RabbitMQ和Spring Cloud Stream的消息消费已经有了一些基础的认识。下面我们通过编写生产消息的单元测试用例来完善我们的入门内容。
在上面创建的工程中创建单元测试类:
@RunWith(SpringRunner.class) @EnableBinding(value = {SinkApplicationTests.SinkSender.class}) public class SinkApplicationTests { @Autowired private SinkSender sinkSender; @Test public void sinkSenderTester() { sinkSender.output().send(MessageBuilder.withPayload("produce a message :http://blog.didispace.com").build()); } public interface SinkSender { String OUTPUT = "input"; @Output(SinkSender.OUTPUT) MessageChannel output(); } }
在应用了上面的消息消费者程序之后,运行这里定义的单元测试程序,我们马上就能在消息消费者的控制台中收到下面的内容:
INFO 50947 --- [L2W-c2AcChb2Q-1] com.didispace.stream.SinkReceiver : Received: produce a message :http://blog.didispace.com
在上面的单元测试中,我们通过@Output(SinkSender.OUTPUT)定义了一个输出通过,而该输出通道的名称为input,与前文中的Sink中定义的消费通道同名,所以这里的单元测试与前文的消费者程序组成了一对生产者与消费者。到这里,本文的内容就次结束,如果您能够独立的完成上面的例子,那么对于Spring Cloud Stream的基础使用算是入门了。但是,Spring Cloud Stream的使用远不止于此,在近期的博文中,我讲继续更新这部分内容,帮助他们来理解和用好Spring Cloud Stream来构建消息驱动的微服务!
本文完整实例:
Github
Gitee
如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!
本文内容部分节选自我的《Spring Cloud微服务实战》,但对依赖的Spring Boot和Spring Cloud版本做了升级。
本文首发于我的博客:http://blog.didispace.com系列教程推荐
Spring Boot基础教程
Spring Cloud基础教程
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/70870.html
摘要:微服务架构概述应用架构的发展应用是可独立运行的程序代码,提供相对完善的业务功能。阿里开源的是的典型实现。它目前由官方开发维护,基于开发,提供一套完整的微服务解决方案。 微服务与Spring Cloud 随着互联网的快速发展, 云计算近十年也得到蓬勃发展, 企业的IT环境和IT架构也逐渐在发生变革,从过去的单体应用架构发展为至今广泛流行的微服务架构。 微服务是一种架构风格, 能给软件应用...
摘要:属性对应服务注册中心的配置内容,指定服务注册中心的位置。项目是针对的服务治理实现。下面可以尝试让的服务提供者运行起来。我们可以用下面的命令启动的开发模式服务端启动完成之后,我们再将之前改造后的服务提供者启动起来。 已经有非常长的时间没有更新《Spring Cloud构建微服务架构》系列文章了,自从开始写Spring Cloud的专题内容开始就获得了不少的阅读量和认可,当然也有一些批评...
摘要:下面的例子,我们将利用上一篇中构建的作为服务注册中心作为服务提供者作为基础。我们先来创建一个服务消费者工程,命名为。初始化,用来真正发起请求。注解用来将当前应用加入到服务治理体系中。 通过上一篇《Spring Cloud构建微服务架构:服务注册与发现》,我们已经成功地将服务提供者:eureka-client或consul-client注册到了Eureka服务注册中心或Consul服务端...
摘要:目前最新版本官网特性专注于提供良好的开箱即用经验的典型用例和可扩展性机制覆盖。分布式消息队列,是对的封装。是对的封装,能实现服务之间的认证调用和安全保护等,并能配合使用。不过随着目前官方的重新申明维护并得到重视,生态圈也会逐渐强大。 简介 Spring Cloud是一系列框架的有序集合。它利用Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发,如服务发现注册、配置中...
摘要:而微服务架构能否成功实践,利用各种工具解决潜在问题是关键。因此,微服务本身可以通过库和运行时代理解决客户端服务发现负载均衡配置更新统计跟踪等。与相比,解决了更广的微服务架构问题。和处理了不同范围的微服务架构技术点,而且是用了不同的方法。 Spring Cloud vs. Kubernetes,谁才是部署微服务的最佳拍档? Spring Cloud和Kubernetes都声称自己是开发和...
阅读 2461·2023-04-26 02:18
阅读 1261·2021-10-14 09:43
阅读 3822·2021-09-26 10:00
阅读 6944·2021-09-22 15:28
阅读 2535·2019-08-30 15:54
阅读 2599·2019-08-30 15:52
阅读 473·2019-08-29 11:30
阅读 3464·2019-08-29 11:05