摘要:需要对外提供配置接口,通过配置接口,动态配置,可接收的消息。遇到问题正式这种注解的方式,使得只能在代码中写死,没法动态修改。而我想实现的效果是能够动态的修改,动态的创建。因此能不能动态修改配置文件对应的变量,然后消费者动态注入。
背景
最近在写一个Mqtt消息转发的中间件,可通过ActvieMq接收消息。需要对外提供配置接口,通过配置接口,动态配置Queue,可接收Queue的消息。
整个项目依赖于SpringBoot ,通过SpringBoot实现队列消费,只需要通过@JmsListener(destination = "queueName") 注解,就可以实现对特定队列的消费。
遇到问题正式这种注解的方式,使得destination只能在代码中写死,没法动态修改。 而我想实现的效果是能够动态的修改destination,动态的创建Consumer。
解决方案继续使用@JmsListener,找到某种方式,能够动态修改destination
不使用@JmsListener,通过ActiveMq提供的Jar,手动实现连接、消费等。之前通过这种方式实现过RabbitMq的处理,因此该方案不会有难度,只是工作量多一些。
由于SpringBoot的过分简单,因此开始尝试通过第一种方式。
解决过程一开始想着通过反射修改注解destination但是尝试失败。后来想到@JmsListener(destination = "${xxx}")这种方式,根据配置文件,可以修改消费的队列,但是需要从新启动。 因此能不能动态修改配置文件对应的变量,然后消费者动态注入Spring。
自定义实现一个MapPropertySource,然后加入到Environment中
@Configuration
public class DynamicTestSetting {
public static final String DYNAMIC_CONFIG = "dynamic_settting"; @Autowired AbstractEnvironment environment; @PostConstruct public void init() { environment.getPropertySources().addFirst(new DynamicLoadPropertySource(DYNAMIC_CONFIG, null)); }
}
@Slf4j
public class DynamicLoadPropertySource extends MapPropertySource {
private static Mapmap = new ConcurrentHashMap (64); private static ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); static { scheduled.scheduleAtFixedRate(new Runnable() { @Override public void run() { //测试:定时修改value //真正使用可能需要传递一个参数或者定时读取配置文件 map.put("test", String.valueOf(System.currentTimeMillis())); } }, 1, 1, TimeUnit.SECONDS); } public DynamicLoadPropertySource(String name, Map source) { super(name, map); } @Override public Object getProperty(String name) { return map.get(name); }
}
2.消费者需要动态加入
@Slf4j
public class TestConsumer {
@JmsListener(destination = "${test}") public void receiveQueue(BytesMessage msg) { //接收消息后的处理逻辑 }
}
/*动态注入bean到spring,需要用到ApplicationContext/
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();
BeanDefinitionBuilder definitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(TestConsumer.class); defaultListableBeanFactory.registerBeanDefinition("testConsumer",definitionBuilder.getBeanDefinition());
//调用getBeaan时Spring会创建一个TestConsumer实例,这个时候ActiveMq中会创建一个destination = "${test}"队列,TestConsummer和其绑定消费
TestConsumer consumer = context.getBean("testConsumer", TestConsumer.class); System.out.println(consumer);动态关闭Queue的消费者
参见另一篇:https://segmentfault.com/n/13...
参考文章:https://blog.csdn.net/qq_3491...
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/71482.html
摘要:本文主要讲述消息服务在中的使用。所以需要一个监听容器工厂的概念,即接口,它会引用上面创建好的与的连接工厂,由它来负责接收消息以及将消息分发给指定的监听器。为了消费消息,订阅者必须保持运行的状态。 JMS 在 SpringBoot 中的使用 摘要:本文属于原创,欢迎转载,转载请保留出处:https://github.com/jasonGeng88/blog> 本文所有服务均采用doc...
摘要:介绍它是出品,最流行的,能力强劲的开源消息总线。是一个完全支持和规范的实现,尽管规范出台已经是很久的事情了,但是在当今的应用中间仍然扮演着特殊的地位。相关文章整合使用整合使用关注我转载请务必注明原创地址为安装同之前一样,直接在里面玩吧。 showImg(https://segmentfault.com/img/remote/1460000012996066?w=1920&h=1281)...
摘要:安装到官方网站下载最新的的安装包,并解压到本地目录下,下载链接如下。修改消费者使用配置消费者监听的队列,其中是接收到的消息收到的报文为接收到的消息重新执行 安装ActiveMQ 到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下:http://activemq.apache.org/do...。showImg(https://segmentfau...
摘要:本文旨在指出中集成的一些性能陷阱,在另一篇文章各组件详解里有组件介绍及如何正确使用的内容。因此的做法会大大降低性能,并且将大部分的时间都花在反复重建这些对象上。提供的可以让使用避免频繁创建的问题。至于使用的性能测试则留给同学自己做了。 Github 本文旨在指出Spring/Spring Boot中集成JMS的一些性能陷阱,在另一篇文章Spring JMS各组件详解里有Spring J...
摘要:还自动配置发送和接收消息所需的基础设施。支持是一个轻量级的可靠的可伸缩的可移植的消息代理,基于协议,使用通过协议进行通信。 32. 消息传递 Spring框架为与消息传递系统集成提供了广泛的支持,从使用JmsTemplate简化的JMS API到使用完整的基础设施异步接收消息,Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTempla...
阅读 1336·2021-11-15 18:11
阅读 2485·2021-08-19 10:56
阅读 656·2021-08-09 13:42
阅读 756·2019-08-30 15:53
阅读 2051·2019-08-30 10:55
阅读 3104·2019-08-29 17:18
阅读 1365·2019-08-29 13:45
阅读 523·2019-08-29 13:15