Redis可以很容的实现消息订阅/发布功能
一.JedisPubSub
需要实现一个JedisPubSub,相当于Redis消息的Listener
package com.gqshao.redis.channels; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisPubSub; public class MyJedisPubSub extends JedisPubSub { protected static Logger logger = LoggerFactory.getLogger(MyJedisPubSub.class); // 取得订阅的消息后的处理 public void onMessage(String channel, String message) { logger.info("取得订阅的消息后的处理 : " + channel + "=" + message); } // 初始化订阅时候的处理 public void onSubscribe(String channel, int subscribedChannels) { logger.info("初始化订阅时候的处理 : " + channel + "=" + subscribedChannels); } // 取消订阅时候的处理 public void onUnsubscribe(String channel, int subscribedChannels) { logger.info("取消订阅时候的处理 : " + channel + "=" + subscribedChannels); } // 初始化按表达式的方式订阅时候的处理 public void onPSubscribe(String pattern, int subscribedChannels) { logger.info("初始化按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels); } // 取消按表达式的方式订阅时候的处理 public void onPUnsubscribe(String pattern, int subscribedChannels) { logger.info(" 取消按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels); } // 取得按表达式的方式订阅的消息后的处理 public void onPMessage(String pattern, String channel, String message) { logger.info("取得按表达式的方式订阅的消息后的处理 :" + pattern + "=" + channel + "=" + message); } }
二.消息订阅/发布
1.消息的订阅需要一个Redis连接始终保持连接,Jedis中停止订阅的unsubscribe是在JedisPubSub中
2.程序中因为需要Jedis始终保持连接,又有可能需要停止订阅,所以用到了ExecutorService
package com.gqshao.redis.channels; import com.gqshao.redis.JedisTest; import org.junit.Test; import redis.clients.jedis.Jedis; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 发布/订阅 */ public class MessageTest extends JedisTest { /** * SUBSCRIBE [channel...] 订阅一个匹配的通道 * PSUBSCRIBE [pattern...] 订阅匹配的通道 * PUBLISH [channel] [message] 将value推送到channelone通道中 * UNSUBSCRIBE [channel...] 取消订阅消息 * PUNSUBSCRIBE [pattern ...] 取消匹配的消息订阅 * web环境中可以编写一个JedisPubSub 继承 @see redis.clients.jedis.JedisPubSub来实现监听 * Jedis中通过使用 JedisPubSub.UNSUBSCRIBE/PUNSUBSCRIBE 来取消订阅 */ @Test public void testSubscribe() { final MyJedisPubSub listener = new MyJedisPubSub(); Thread thread = new Thread(new Runnable() { @Override public void run() { logger.info("subscribe channelA.test channelB.send_message"); jedis.subscribe(listener, "channelA.test", "channelB.send_message"); } }); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(thread); // 测试发送 Jedis pubJedis = pool.getResource(); logger.info("publish channelA.test OK : " + pubJedis.publish("channelA.test", "OK")); logger.info("publish channelB.send_message \"Hello World!\" : " + pubJedis.publish("channelB.send_message", "Hello World!")); listener.unsubscribe("channelA.test", "channelB.send_message"); try { executor.shutdownNow(); logger.info("executor.shutdownNow"); if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { logger.warn("Pool did not terminated"); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } logger.info("完成subscribe测试"); } /** * SUBSCRIBE channelone 订阅一个通道 * PSUBSCRIBE channel* 订阅一批通道 * PUBLISH channelone value 将value推送到channelone通道中 * web环境中可以编写一个Listener 继承 @see redis.clients.jedis.JedisPubSub来实现监听 */ @Test public void testPsubscribe() { final MyJedisPubSub listener = new MyJedisPubSub(); Thread thread = new Thread(new Runnable() { @Override public void run() { logger.info("psubscribe channel*"); jedis.psubscribe(listener, "channel*"); } }); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(thread); // 测试发送 Jedis pubJedis = pool.getResource(); logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK")); logger.info("publish channelB.send_message \"Hello World!\"" + pubJedis.publish("channelB.send_message", "Hello World!")); pool.returnResource(pubJedis); listener.punsubscribe(); try { executor.shutdownNow(); logger.info("executor.shutdownNow"); if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { logger.warn("Pool did not terminated"); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } logger.info("完成psubscribe测试"); logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK")); } }
相关推荐
基于ssm实现websocket长连接+redis发布/订阅消息,服务端实时推送消息至前端页面,实时通信。内含前端代码,如需sql文件请下载https://download.csdn.net/download/gmetbtgbki/10824890
Java实现Redis的消息订阅和发布实例。
Redis支持跨进程发布订阅机制。代码实现了key过期的notification.
redis消息订阅发布
本案例包含redis的发布订阅功能,以及dotnet core+SignalR实现的简单即时通信,并提供文档笔记。本案例初衷是想结合redis的发布订阅功能+websocket实现消息客户端页面订阅指定的消息,并在客户端页面进行显示;
基于netcore 3.0的redis发布订阅示例代码,直接可以运行,学习netcore和redis的很好入门示例代码。
实现redis发布订阅的一个小Demo,一个发布消息,其他订阅了的都能接收消息
mq-redis|[redis之mq实现,发布订阅模式](https://github.com/smltq/spring-boot-demo/blob/master/mq-redis) email|[email实现邮件发送](https://github.com/smltq/spring-boot-demo/blob/master/email) jGit|...
Redis 发布订阅 Demo,SpringBoot 使用 Redis 发布订阅模式
Redis发布与订阅系统源码,统一配置更新数据库,亲测可用,简单易懂。切记运行程序前要开启Redis服务.
SpringBoot + Redis实现事件的发布订阅功能。详情可看博文https://blog.csdn.net/linhaiyun_ytdx/article/details/103569370
redis绑定webSocket发布订阅,进行长连接推送,用以暂时进度条,查看任务进行状态,失败数量与成功数量
redis订阅机制,一方面推送消息,另一方面同时接收消息。
redispubandsub订阅预发布,本包使用C#编写的代码,在使用之前需要有redis的服务,否则无法使用
<redis从入门到精通视频教程> ...发布与订阅模式命令介绍> ├<16.使用winHex对RDB和AOF原理进行剖析> ├<17.redis的主从复制模式> ├<18.showlog监控慢日志查询以及源码分析> ├<19.项目架构之一主两从三Sentinel>
可以工作在nginx tcp负载均衡环境中的集群聊天服务器和客户端源码 基于nuduo库实现 使用了redis发布订阅消息队列 数据库采用MySQL 可以工作在nginx tcp负载均衡环境中的集群聊天服务器和客户端源码 基于nuduo库实现 ...
Redis支持跨进程发布订阅机制。代码实现了key过期的notification.
Java实现Redis的消息订阅和发布源码
官方不面向 Mac OS X 和 Windows 平台的非订阅用户提供二进制安装包,因此普通用户需要自行使用源码进行编译。 本人自己编译发布,已测试正常可用。 对于大多数用户来说,这个版本最大改善是速度快了不少。 新功能...