博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot @JmsListener(destination = ) 运行时动态修改
阅读量:6689 次
发布时间:2019-06-25

本文共 2394 字,大约阅读时间需要 7 分钟。

背景

最近在写一个Mqtt消息转发的中间件,可通过ActvieMq接收消息。需要对外提供配置接口,通过配置接口,动态配置Queue,可接收Queue的消息。

整个项目依赖于SpringBoot ,通过SpringBoot实现队列消费,只需要通过@JmsListener(destination = "queueName") 注解,就可以实现对特定队列的消费。

遇到问题

正式这种注解的方式,使得destination只能在代码中写死,没法动态修改。 而我想实现的效果是能够动态的修改destination,动态的创建Consumer。

解决方案

  1. 继续使用@JmsListener,找到某种方式,能够动态修改destination
  2. 不使用@JmsListener,通过ActiveMq提供的Jar,手动实现连接、消费等。之前通过这种方式实现过RabbitMq的处理,因此该方案不会有难度,只是工作量多一些。

由于SpringBoot的过分简单,因此开始尝试通过第一种方式。

解决过程

一开始想着通过反射修改注解destination但是尝试失败。后来想到@JmsListener(destination = "${xxx}")这种方式,根据配置文件,可以修改消费的队列,但是需要从新启动。 因此能不能动态修改配置文件对应的变量,然后消费者动态注入Spring。

  1. 自定义实现一个MapPropertySource,然后加入到Environment中

@Configuration

public class DynamicTestSetting {

public static final String DYNAMIC_CONFIG = "dynamic_settting";@AutowiredAbstractEnvironment environment;@PostConstructpublic void init() {    environment.getPropertySources().addFirst(new DynamicLoadPropertySource(DYNAMIC_CONFIG, null));}

}

@Slf4j

public class DynamicLoadPropertySource extends MapPropertySource {

private static Map
map = new ConcurrentHashMap
(64);private static ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);static { scheduled.scheduleAtFixedRate(new Runnable() { @Override public void run() { //测试:定时修改value //真正使用可能需要传递一个参数或者定时读取配置文件 map.put("test", String.valueOf(System.currentTimeMillis())); } }, 1, 1, TimeUnit.SECONDS);}public DynamicLoadPropertySource(String name, Map
source) { super(name, map);}@Overridepublic Object getProperty(String name) { return map.get(name);}

}

2.消费者需要动态加入

@Slf4j
public class TestConsumer {

@JmsListener(destination = "${test}")public void receiveQueue(BytesMessage msg) {    //接收消息后的处理逻辑}

}

/*动态注入bean到spring,需要用到ApplicationContext/

DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();

BeanDefinitionBuilder definitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(TestConsumer.class);        defaultListableBeanFactory.registerBeanDefinition("testConsumer",definitionBuilder.getBeanDefinition());

//调用getBeaan时Spring会创建一个TestConsumer实例,这个时候ActiveMq中会创建一个destination = "${test}"队列,TestConsummer和其绑定消费

TestConsumer consumer = context.getBean("testConsumer", TestConsumer.class);        System.out.println(consumer);

动态关闭Queue的消费者

参见另一篇:

参考文章:

转载地址:http://sbeao.baihongyu.com/

你可能感兴趣的文章
深入理解javascript原型和闭包(16)——完结
查看>>
近日记事2-PG库挂掉了,还是恢复吧~
查看>>
数据源ObjectDataSource的数据访问类的编写
查看>>
如何点击每一列的时候alert其index
查看>>
【原创翻译】类型
查看>>
深入解读Windows Azure VM 实例级 IP
查看>>
python常用函数
查看>>
Eclipse记录
查看>>
C++ 一个自己实现的字符串类
查看>>
KVM
查看>>
我的友情链接
查看>>
字节流
查看>>
大型网站架构演变和知识体系
查看>>
抛砖引玉:Session和Cookie在WEB开发中的最佳实践
查看>>
一次小***处理
查看>>
Nginx配置文件nginx.conf中文详解
查看>>
linux anaconda kickstart基础
查看>>
DITA vs DocBook
查看>>
调整Outlook 2010的pst文件大小
查看>>
python笔记二 基础
查看>>