搜索

SpringBoot+Redis自定义注解实现发布订阅

发表于 2025-11-05 15:29:52 来源:全栈开发

前言

最近开发了一个内部消息组件,自注解逻辑大体是定义订阅通过定义注解 @MessageHub,在启动时扫描全部bean中有使用了该注解的实现方法后台创建一个常驻线程代理消费数据,当线程消费到数据就回写到对应加了注解的发布方法里。复制@Slf4j @Service public class RedisConsumerDemo { @MessageHub(topic = "${uptown.topic}",自注解 type = "REDIS_PUBSUB") public void consumer(Object message) { log.info("pubsub info {} ", message);} }1.2.3.4.5.6.7.8.

实现redis的队列、stream方式实现都很简单,定义订阅唯独发布订阅方式,实现网上的发布demo全都是一个固定套路,通过redis容器注入监听器,自注解而且回写非常死板。定义订阅那么如何将这块的实现逻辑统一呢。

SpringBoot+Redis自定义注解实现发布订阅

常规写法

常规实现reids的发布发布订阅模式写法一共三步

1.创建消息监听器

复制@Bean public MessageListenerAdapter smsExpirationListener(TestSubscriber messageListener) { return new MessageListenerAdapter(messageListener, "onMessage"); }1.2.3.4.

2.创建订阅器

复制@Component public class TestSubscriber implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { log.info("get data :{}", msg);} }1.2.3.4.5.6.7.8.9.

3.向redis容器中添加消息监听器

复制@Configuration public class RedisConfig { @Bean public RedisMessageListenerContainer container( RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter smsExpirationListener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); container.addMessageListener(smsExpirationListener, new PatternTopic("test")); return container; } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.

这样定义非常简单明了,但是自注解有个问题是太代码僵硬了,创建监听者很不灵活,定义订阅只能指定内部的实现onMessage方法,那么怎么才能融入到我们的内部消息流转中间件里呢。

自定义注解实现

我们内部组件抽象了两个方法,云服务器提供商生产和消费,但这两个方法逻辑截然不同,生产方法是暴露给serverice层接口调用,调用方在调用生产方法后能直接知道生产了几条数据和成功与否。而消费方法是配合Spring生命周期函数服务启动时建立常驻消费线程的。

复制/** * 生产消息 */ Integer producer(MessageForm messageForm); /** * 消费消息 */ void consumer(ConsumerAdapterForm adapterForm);1.2.3.4.5.6.7.8.9.

生产消息当然很容易实现,只需要调用已经封装好的convertAndSend方法。

复制stringRedisTemplate.convertAndSend(messageForm.getTopic(), messageForm.getMessage());1.

消费方法就有说法了,动态生成监听者的场景下使用redis容器用代码挨个注册已经满足不了了,但仔细过一遍源代码就会发现,监听类的构造方法的入参只有两个,第一个需要回调的代理类,第二个消费到数据后回调的方法。

复制/** * Create a new {@link MessageListenerAdapter} for the given delegate. * * @param delegate the delegate object * @param defaultListenerMethod method to call when a message comes * @see #getListenerMethodName */ public MessageListenerAdapter(Object delegate, String defaultListenerMethod) { this(delegate); setDefaultListenerMethod(defaultListenerMethod); }1.2.3.4.5.6.7.8.9.10.11.

方案有了,本质上就是把RedisMessageListenerContainer注入进来之后,扫描项目里所有加了 @MessageHub 的bean,包装成监听类加载到容器里就完事了。亿华云计算

怎么扫描的代码就不再赘述了,实现Spring的生命周期函数BeanPostProcessor#postProcessAfterInitialization,在这里用AnnotationUtils判断是否标注了注解。

复制MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class); if (annotation == null) { continue; }1.2.3.4.

标注了后判断如果是发布订阅,进入发布订阅的实现类。

复制@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS) @Service("redisPubSubProcessor") public class RedisPubSubProcessor extends MessageHubServiceImpl { @Resource RedisMessageListenerContainer redisPubSubContainer; @Override public void produce(ProducerAdapterForm producerAdapterForm) { stringRedisTemplate.convertAndSend(producerAdapterForm.getTopic(), producerAdapterForm.getMessage()); } @Override public void consume(ConsumerAdapterForm messageForm) { MessageListenerAdapter adapter = new MessageListenerAdapter(messageForm.getBean(), messageForm.getInvokeMethod().getName()); adapter.afterPropertiesSet(); redisPubSubContainer.addMessageListener(adapter, new PatternTopic(messageForm.getTopic())); } @Bean public RedisMessageListenerContainer redisPubSubContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.

先将RedisMessageListenerContainer注入到Spring容器里,produce方法只需要调用下现成的api。consume方法由于上一步我们获取了bean和对应的method,直接用MessageListenerAdapter的构造器创建出监听器来,这里有坑,需要手动调用adapter.afterPropertiesSet()设置一些必要的属性,这个在常规写法里框架帮忙做了。如果不调用的话会出一些空指针之类的bug。

随后把监听器add到容器就实现了方法代理,背后的线程监听到数据会回调到标注了 @MessageHub 的方法里

源码库
随机为您推荐
版权声明:本站资源均来自互联网,如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

Copyright © 2016 Powered by SpringBoot+Redis自定义注解实现发布订阅,全栈开发  滇ICP备2023006006号-32sitemap

回顶部