Redis原理 — PubSub

Redis原理 — PubSub

从前面的知识点中我们学过:基于 Redis 的 list 结构可以实现异步队列,基于 zset 结构可以实现延时队列。但是这种方式都有一个不足点,就是他们都是单播机制(一对一),多个消费者订阅同一个生产者只有一个消费者能够消费到消息。在有些业务场景下,我们希望生产者发出的消息能够被所有订阅到的消费者消费到,Redis 单独提供了一个模块 PubSub 支持消息多播(一对多)。

基本使用

subscribe
publish

订阅消息,在下面的实例中,订阅了两个主题分别是 topic1、topic2,随后立即返回了两个订阅成功的响应:

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> subscribe topic1 topic2
Reading messages... (press Ctrl-C to quit)
# topic1 订阅成功
1) "subscribe" # 消息类型
2) "topic1" # 订阅的 channel 名
3) (integer) 1 # 数据
# topic2 订阅成功
1) "subscribe"
2) "topic2"
3) (integer) 2
_ # 进入阻塞等待接收消息中

发布消息,在下面的实例中,分别在两个主题上发布消息,消费端接收到消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
127.0.0.1:6379> publish topic1 topic1-message
(integer) 1
127.0.0.1:6379> publish topic2 '{"message": "topic2 message"}'
(integer) 1

-----------------------------------------------------------------
127.0.0.1:6379> subscribe topic1 topic2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "topic1"
3) (integer) 1
1) "subscribe"
2) "topic2"
3) (integer) 2
# topic1 接收到消息
1) "message" # 消息类型
2) "topic1" # 订阅的 channel 名
3) "topic1-message" # 数据
# topic2 接收到消息
1) "message"
2) "topic2"
3) "{\"message\": \"topic2 message\"}"

模式订阅

psubscribe

模式订阅,就是让消费者订阅一组名称符合指定模式的生产者,这样当新增一个生产者时客户端不需要新增一个订阅指令就能接收到新的生产者的消息了。

模式订阅的指令是 psubscribe 不是 subscribe,并且订阅成功之后返回的订阅指令类型是 psubscribe:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
127.0.0.1:6379> psubscribe topic*
Reading messages... (press Ctrl-C to quit)
# 模式订阅成功
1) "psubscribe"
2) "topic*"
3) (integer) 1
# 接收到 topic1 的消息
1) "pmessage" # 消息类型
2) "topic*" # 订阅模式名
3) "topic1" # 订阅的 channel 名
4) "hello" # 数据
# 接收到 topic2 的消息
1) "pmessage"
2) "topic*"
3) "topic2"
4) "world"
# 接收到 topic2 的消息
1) "pmessage"
2) "topic*"
3) "topic2"
4) "!"

代码实现

Publisher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 消息发布者
*
* @author Harry Zhang
* @since 2020/4/10 01:26
*/
public class Publisher {
public static final JedisPool POOL = new JedisPool(new JedisPoolConfig(), "localhost");

public static void main(String[] args) {
Jedis jedis = POOL.getResource();
jedis.publish("codehole", "python comes");
jedis.publish("codehole", "java comes");
jedis.publish("codehole", "golang comes");
}
}

Subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 消息订阅者
*
* @author Harry Zhang
* @since 2020/4/10 01:26
*/
public class Subscribe {
public static final JedisPool POOL = new JedisPool(new JedisPoolConfig(), "localhost");

public static void main(String[] args) {
Jedis jedis = POOL.getResource();
// jedis.subscribe(new Subscriber(), "codehole");
jedis.psubscribe(new Subscriber(), "code*");
}

static class Subscriber extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println(String.format("[接收到消息] channel: %s, message: %s", channel, message));
}

@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println(String.format("[接收到模式匹配消息] pattern: %s, channel: %s, message: %s", pattern, channel,
message));
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("[订阅成功] channel: %s, subscribedChannels: %s", channel,
subscribedChannels));
}

@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println(String.format("[模式订阅成功] pattern: %s, subscribedChannels: %s", pattern,
subscribedChannels));
}
}
}

消费者控制台输出

[模式订阅成功] pattern: code*, subscribedChannels: 1
[接收到模式匹配消息] pattern: code*, channel: codehole, message: python comes
[接收到模式匹配消息] pattern: code*, channel: codehole, message: java comes
[接收到模式匹配消息] pattern: code*, channel: codehole, message: golang comes

项目地址:/project/2-6

缺点不足

  1. 发布一个消息,如果没有一个消费者订阅,消息将被丢失;
  2. 多个消费者订阅同一个消息,其中一个消费者掉线重启之后,期间的消息不会接收到,其他两个消费者能收到消息;
  3. 消息不支持持久化,Redis 宕机之后所有的消息将被丢失。

为了弥补 PubSub 的不足,Redis5.0 新增了新的数据结构 Stream,支持持久化的消息队列。

Comments