Redis 除了做数据缓存,做 NoSQL 数据库,也可以当做轻量级消息队列使用,并且提供了基于 List 实现的、基于 Pub/Sub 机制的订阅/发布模式、基于 sorted set 的实现和基于 Stream 类型的实现几种实现方式。其中 List 实现的分非阻塞和阻塞方式,Stream 则是 Redis 5 加入的消息队列。
之前代码已经写过了,只是工程整合搞得比较复杂,所以这里算是写份注释文档。
关联代码地址lin/lin-redis at master · zgshen/lin。
使用 List 类型实现
List 就是列表数据结构,用来做消息队列这是最简单直观的了,也是典型的点对点消息模型,先看下 Redis 列表提供的操作命令。
push 压入:
- LPUSH key value1 [value2 ...] 将一个或多个值插入到列表头部
- RPUSH key value1 [value2 ...] 将一个或多个值插入到列表尾部
pop 弹出:
- LPOP key 移除并获取列表的第一个元素
- RPOP key 移除并获取列表的最后一个元素
阻塞弹出;:
- BLPOP key1 [key2 ...] timeout 移除并获取列表的第一个元素,若列表为空则阻塞等待
- BRPOP key1 [key2 ...] timeout 移除并获取列表的最后一个元素,若列表为空则阻塞等待
压入和弹出前面的 L 和 R 表示从队列左端和右端压入和弹出,阻塞弹出的 B 代表就是 blocking 的意思。
使用队列一般遵循先进先出,所以要么左近右出,要么右近左出,框架提供的 RedisTemplate 封装了 Redis 的操作命令,push 和 pop 直接调用就行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Autowired private RedisTemplate redisTemplate;
public Long push(String... params) { Long aLong = redisTemplate.opsForList().leftPushAll(LIST_PUSH_POP_MSG, params); return aLong; }
public String pop() { String str = redisTemplate.opsForList().rightPop(LIST_PUSH_POP_MSG).toString(); return str; }
|
再看下堵塞弹出的异步操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void blockingConsume() { List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { return connection.bLPop(PUB_SUB_TIME_OUT, LIST_PUSH_POP_MSG.getBytes()); } }, new StringRedisSerializer()); for (Object str : obj) { log.info("blockingConsume : {}", str); } }
|
此外 Redis 还有两个命令 RPOPLPUSH、BRPOPLPUSH(阻塞)可以从一队列获取队列并且写入另一个队列,可以用于简单保证消息可靠性,业务成功处理后再移除另一队列的消息,如果业务处理失败又可以从另一队列恢复。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public String rightPopLeftPush() { String str; try { str = redisTemplate.opsForList().rightPopAndLeftPush(LIST_PUSH_POP_MSG, LIST_PUSH_POP_BACKUP_MSG).toString(); } catch (Exception e) { log.error("业务异常:{}", e.getMessage()); throw new RuntimeException(e); } redisTemplate.opsForList().leftPop(LIST_PUSH_POP_BACKUP_MSG); return str; }
|
使用 Sorted Set 实现
Sorted Set 是有序集合,元素唯一不可重复,元素按照 score 值升序排列,支持范围操作,所以适合做简单的延迟消息队列。
添加元素:
- ZADD key score member [score member ...] 向有序集合中加入一个或多个成员,或更新已存在成员的分数
获取元素:
- ZRANGE key start stop [WITHSCORES] 按位置范围遍历集合,可附加分数
- ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT...] 按分数范围遍历集合
以下是简单的生产和消费程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public void produce(String businessId, long expiredTime) { redisTemplate.opsForZSet().add(MsgConstant.SORTED_SET_MSG, businessId, System.currentTimeMillis() + expiredTime * 1000); }
public void consume() { while (true) { Set<String> set = redisTemplate.opsForZSet().rangeByScore(MsgConstant.SORTED_SET_MSG, 0, System.currentTimeMillis(), 0, 1); if (set == null || set.isEmpty()) continue; log.info(set.toString()); String next = set.iterator().next(); Long remove = redisTemplate.opsForZSet().remove(MsgConstant.SORTED_SET_MSG, next); if (remove > 0) log.info("{} remove success.", next); } }
|
使用 Pub/Sub 订阅发布模式
发布者把消息发到某个频道,订阅改频道的所有消费者都会收到消息,即消息多播,并且订阅支持模糊匹配频道。这种方式就是常规的消费者-消费者模型,不过与典型的 MQ 还是有区别,Pub/Sub 订阅发布更像是个广播,不能并发消费,不支持持久化,也没有 ACK 确认。
发布命令:
- PUBLISH channel message : 将消息 message 发布到指定的频道 channel
订阅命令:
- SUBSCRIBE channel [channel ...] : 订阅一个或多个频道
- PSUBSCRIBE pattern [pattern ...] : 订阅一个或多个模式,用于模糊匹配频道
Spring 工程的配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter adapter, MessageListenerAdapter adapter1) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(adapter, new PatternTopic(PUB_SUB_MSG)); container.addMessageListener(adapter1, new PatternTopic(PUB_SUB_MSG_FUZZY)); return container; }
@Bean public MessageListenerAdapter adapter(MessageSubscribe message){ return new MessageListenerAdapter(message, "onMessage"); }
@Bean public MessageListenerAdapter adapter1(MessageSubscribe1 message){ return new MessageListenerAdapter(message, "onMessage"); }
|
订阅者类:
1 2 3 4 5 6 7 8 9 10
| @Slf4j @Component public class MessageSubscribe implements MessageListener {
@Override public void onMessage(Message message, byte[] bytes) { log.info("sub, topic name: {}, message: {}", new String(bytes), new String(message.getBody())); }
}
|
发布者类:
1 2 3 4 5 6 7 8 9 10 11
| @Service public class MessagePublish {
@Autowired StringRedisTemplate redisTemplate;
public void publish(String channel, String msg) { redisTemplate.convertAndSend(channel, msg); }
}
|
使用 Stream
Redis 5.0 新增了 Stream 的数据结构,与 Pub/Sub 订阅发布模式相比,Redis Stream 提供了消息的持久化和主备复制功能。
添加消息:
1
| XADD key ID field value [field value ...]
|
其中ID,消息id,可使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性
读取消息:
1
| XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
|
milliseconds 设置堵塞秒数,没设置就是非阻塞模式。
创建消费者组:
1
| XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
|
key 队列名,不存在就创建;groupname 组名;$ 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
1 2 3 4 5
| # 从头开始消费 XGROUP CREATE mystream consumer-group-name 0-0
# 从尾部开始消费 XGROUP CREATE mystream consumer-group-name $
|
读取消费者组中的消息:
1
| XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
|
group 消费组名;consumer 消费者名;count 读取数量;milliseconds 阻塞毫秒数;key 队列名;ID 消息 ID。
例子:
1
| XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
|
看下在 Spring Boot 中的使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
@Slf4j @Component public class RedisStreamRunner implements ApplicationRunner, DisposableBean {
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container; private final ThreadPoolTaskExecutor executor; private final RedisConnectionFactory redisConnectionFactory; private final StringRedisTemplate stringRedisTemplate;
public RedisStreamRunner(ThreadPoolTaskExecutor executor, RedisConnectionFactory redisConnectionFactory, StringRedisTemplate stringRedisTemplate) { this.executor = executor; this.redisConnectionFactory = redisConnectionFactory; this.stringRedisTemplate = stringRedisTemplate; }
@Override public void run(ApplicationArguments args) throws Exception { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(10) .executor(executor) .pollTimeout(Duration.ZERO) .build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
initStreamAndGroup(stringRedisTemplate.opsForStream(), STREAM_KEY, STREAM_GROUP); container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()), new TestStreamListener(stringRedisTemplate));
this.container = container; this.container.start(); }
private void initStreamAndGroup(StreamOperations<String, ?, ?> ops, String streamKey, String group) { String status = "OK"; try { StreamInfo.XInfoGroups groups = ops.groups(streamKey); if (groups.stream().noneMatch(xInfoGroup -> group.equals(xInfoGroup.groupName()))) { status = ops.createGroup(streamKey, group); } } catch (Exception exception) { RecordId initialRecord = ops.add(ObjectRecord.create(streamKey, "Initial Record")); Assert.notNull(initialRecord, "Cannot initialize stream with key '" + streamKey + "'"); status = ops.createGroup(streamKey, ReadOffset.from(initialRecord), group); } finally { Assert.isTrue("OK".equals(status), "Cannot create group with name '" + group + "'"); } }
@Override public void destroy() { this.container.stop(); }
}
|
TestStreamListener 处理消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Slf4j public class TestStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
StringRedisTemplate redisTemplate;
public TestStreamListener(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; }
@Override public void onMessage(MapRecord<String, String, String> message) {
log.info("MessageId: " + message.getId()); log.info("Stream: " + message.getStream()); log.info("Body: " + message.getValue()); redisTemplate.opsForStream().acknowledge(STREAM_GROUP, message); } }
|
生产者:
1 2 3 4 5 6 7 8 9 10 11 12
| @Service public class TestStreamProducer {
@Autowired StringRedisTemplate redisTemplate;
public void add(String streamKey, String msg) { redisTemplate.opsForStream().add(Record.of(msg).withStreamKey(streamKey)); }
}
|
参考