Redis Stream 作为队列的定位
Redis 做「队列」常见几种方式:
- List(LPUSH/BRPOP):简单 FIFO,但一条消息只能被一个消费者取走,无法多实例负载均衡,也没有「未 ACK 可重新投递」的语义。
- Pub/Sub:广播,消息不落盘,订阅者不在就丢消息,不适合任务队列。
- Stream:消息持久化、支持消费者组、每条消息有唯一 ID、支持 XACK 与 PEL(Pending Entries List),可实现多消费者负载均衡与至少一次消费,适合作为「队列」使用。
下文先说明把 Redis Stream 当队列 的核心概念与 Spring 用法,再在延伸里讲 基于 ZSET + Stream 的延迟队列 实现要点。
Stream 核心概念与命令
基本结构
- Stream:一个 key 对应一条日志流,每条记录有一个 消息 ID(默认毫秒时间戳-序号,如
1739123456789-0)和多个 field-value 对(类似 hash)。 - 生产者:
XADD stream * field1 value1 field2 value2,* 表示自动生成 ID。 - 单消费者读:
XREAD BLOCK 5000 STREAMS stream 0,从 ID 0 起读,阻塞最多 5 秒。
消费者组(多消费者、负载均衡、至少一次)
- XGROUP CREATE stream group-name 0 MKSTREAM:在 stream 上创建消费者组,从
0 开始消费;MKSTREAM 表示 stream 不存在时创建。 - XREADGROUP GROUP group-name consumer-name STREAMS stream >:在组内用
consumer-name 领消息,> 表示「只领尚未分配给任何消费者的新消息」。 - 被领走但未 ACK 的消息会进入该消费者在组内的 PEL(Pending Entries List);其他消费者不会再次领到同一条,除非用 XAUTOCLAIM 接管超时未 ACK 的。
- XACK stream group-name message-id:确认某条消息已处理完成,从 PEL 移除。
- XDEL stream message-id:从 stream 中删除该条记录(可选,看是否需要保留历史)。
因此:把 Stream 当队列 = 用 XADD 投递 → 用 XREADGROUP 在组内领消息 → 业务处理 → XACK(可选再 XDEL),即可实现多实例负载均衡与至少一次语义。
Spring 集成:监听容器与建组时机
StreamMessageListenerContainer
Spring Data Redis 提供 StreamMessageListenerContainer,内部用 XREADGROUP 轮询,收到消息后回调你注册的 StreamListener。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainerOptions.<String, MapRecord<String, String, String>>builder() .batchSize(10) .pollTimeout(Duration.ofSeconds(2)) .build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(factory, options);
container.receive( Consumer.from(GROUP_NAME, CONSUMER_NAME), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()), streamListener ); container.start();
|
- ReadOffset.lastConsumed():对应 Redis 的
>,只在组内领「尚未被分配过的」新消息,与 XACK 配合使用。 - ReadOffset.latest():对应
$,只消费启动之后新到的消息,不涉及组内位点,一般不用于「队列」场景。
建组时机与 NOGROUP
使用 XREADGROUP 前,Stream 和消费者组必须已存在,否则会报 NOGROUP。因此要在 Stream 监听容器启动之前 先执行 XGROUP CREATE。
做法:单独一个 Bean 在 @PostConstruct 里建组,Stream 容器的 @Bean 上 @DependsOn 该 Bean,保证顺序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component("streamGroupInitializer") public class StreamGroupInitializer {
private final RedisConnectionFactory connectionFactory;
@PostConstruct public void createGroupIfNotExists() { try (RedisConnection connection = connectionFactory.getConnection()) { RedisStreamCommands streamCommands = connection.streamCommands(); byte[] streamKey = STREAM_KEY.getBytes(StandardCharsets.UTF_8); try { streamCommands.xGroupCreate(streamKey, GROUP_NAME, ReadOffset.from("0"), true); } catch (Exception e) { if (e.getMessage() != null && e.getMessage().contains("BUSYGROUP")) { } else { throw e; } } } } }
|
1 2 3 4 5 6 7
| @Bean @DependsOn("streamGroupInitializer") public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer( RedisConnectionFactory factory) { return container; }
|
消费语义:ACK 与可选 XDEL
- 处理成功:应对该消息做 XACK,否则会一直留在 PEL,占位且可能被 XAUTOCLAIM 转给其他消费者。
- 若希望 stream 不堆积已消费消息,可在 ACK 后 XDEL。XACK 与 XDEL 建议用 Lua 脚本 原子执行,避免只 ACK 不删或只删不 ACK 导致状态不一致。
1 2 3
| redis.call('XACK', KEYS[1], ARGV[1], ARGV[2]) redis.call('XDEL', KEYS[1], ARGV[2]) return 1
|
1 2 3 4 5
| redisTemplate.execute(SCRIPT_ACK_AND_DELETE, List.of(STREAM_KEY), GROUP_NAME, messageId);
|
- 处理失败:不要 ACK,可抛异常或记日志;消息会留在 PEL,可由同一或其它消费者通过 XAUTOCLAIM 或再次读取 PEL 做重试。
Listener 中拿到消息与业务处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Override public void onMessage(MapRecord<String, String, String> message) { String messageId = message.getId().getValue(); Map<String, String> value = message.getValue(); String body = value.get("body");
try { processTask(body); redisTemplate.execute(SCRIPT_ACK_AND_DELETE, List.of(STREAM_KEY), GROUP_NAME, messageId); } catch (Exception e) { log.error("处理失败,消息留在 PEL", e); throw e; } }
|
生产端用 XADD 或 Spring 的 StringRedisTemplate 写 stream(可封装为 opsForStream().add(StreamRecords.newRecord().in(STREAM_KEY).ofMap(map))即可完成「Stream 作为队列」的闭环。
延伸:基于 ZSET + Stream 的延迟队列
在「Stream 作队列」的基础上,若需要 按时间触发(延迟 N 秒/分钟再消费),可加上 ZSET 做时间调度,把「到期」任务再搬进 Stream,形成延迟队列。
需求与架构
- 任务先按 执行时间 排队,到期后再被消费;消费失败要能 按重试次数 重新入队。
- ZSET:score = 执行时间戳,member 为
{retry}_{key}(key 为业务唯一标识,如订单号),用 ZRANGEBYSCORE ZSET_KEY 0 now 拉取到期任务。 - Dispatcher:单独线程循环拉取到期 member,从 ZSET 移除 并 写入 Stream;只有真正移除成功才写入,避免多实例重复入队,因此 ZREM + XADD 用 Lua 原子 执行。
- Consumer:与普通 Stream 队列一致,业务成功则 XACK + XDEL(Lua),失败则不 ACK;若需延迟重试,可解析
body 拆出 retry 与 key,未超最大重试则以 (retry+1)_key 再次 ZADD,超过最大重试则只打日志。
Member 格式与 retry 传递
- 例如
0_order-1001:前半为 retry,第一个 _ 之后为业务 key。Dispatcher 把整段作为 Stream 的 body,把前半作为 retry;失败回写 1_order-1001 等,并配合最大重试次数(如 3)。
常量与配置
1 2 3 4 5 6 7 8
| public final class DelayQueueConfig { public static final String ZSET_KEY = "delay-queue:zset"; public static final String STREAM_KEY = "delay-queue"; public static final String GROUP_NAME = "delay-queue:consumer-group"; public static final String CONSUMER_NAME = "consumer_1"; public static final String BODY_FIELD = "body"; public static final String RETRY_FIELD = "retry"; }
|
生产者:ZADD 延迟任务
约定 member 为 0_{key},便于解析 retry;若直接以业务 key 入 ZSET(无下划线),Dispatcher 将 retry 视为 0。
1 2 3 4 5
| public void submit(String key, long delayMs) { String member = "0_" + key; long executeAt = System.currentTimeMillis() + delayMs; redisTemplate.opsForZSet().add(DelayQueueConfig.ZSET_KEY, member, executeAt); }
|
Dispatcher Lua:ZREM 成功才 XADD
1 2 3 4 5
| local removed = redis.call('ZREM', KEYS[1], ARGV[1]) if removed == 1 then return redis.call('XADD', KEYS[2], '*', ARGV[2], ARGV[3], ARGV[4], ARGV[5]) end return nil
|
KEYS[1]=ZSET,KEYS[2]=STREAM;ARGV[1]=member,ARGV[2..5] 为 body 字段名、member 值、retry 字段名、retry 值。
Dispatcher:线程循环与 Java 调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| private static final RedisScript<String> SCRIPT_ZREM_THEN_XADD = RedisScript.of( "local removed = redis.call('ZREM', KEYS[1], ARGV[1])\n" + "if removed == 1 then\n" + " return redis.call('XADD', KEYS[2], '*', ARGV[2], ARGV[3], ARGV[4], ARGV[5])\n" + "end\n" + "return nil", String.class);
@PostConstruct public void startDispatcher() { dispatcherThread = new Thread(this::runLoop, "delay-queue-dispatcher"); dispatcherThread.start(); }
@PreDestroy public void stopDispatcher() { running.set(false); if (dispatcherThread != null) { dispatcherThread.interrupt(); } }
private void runLoop() { while (running.get()) { try { if (dispatchBatch() == 0) { Thread.sleep(50); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { log.warn("Dispatcher 批次异常", e); try { Thread.sleep(50); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; } } } }
private int dispatchBatch() { long now = System.currentTimeMillis(); Set<String> expired = redisTemplate.opsForZSet().rangeByScore( DelayQueueConfig.ZSET_KEY, 0, now, 0, 100); if (expired == null || expired.isEmpty()) { return 0; } int count = 0; List<String> redisKeys = List.of(DelayQueueConfig.ZSET_KEY, DelayQueueConfig.STREAM_KEY); for (String member : expired) { String retryPart = member.indexOf('_') >= 0 ? member.substring(0, member.indexOf('_')) : "0"; redisTemplate.execute( SCRIPT_ZREM_THEN_XADD, redisKeys, member, DelayQueueConfig.BODY_FIELD, member, DelayQueueConfig.RETRY_FIELD, retryPart); count++; } return count; }
|
有到期任务时连续 dispatchBatch 直至本批为空再进入短休眠,避免固定周期拉长延迟;@PreDestroy 中断线程并优雅退出。
Consumer:成功 ACK+DEL,失败回写 ZSET(示例)
成功处理仍用前文 SCRIPT_ACK_AND_DELETE。失败时可选将任务重新写入 ZSET(注意与 PEL 策略二选一,避免重复消费):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private void rescheduleToZset(String body, long delayMs, int maxRetry) { int sep = body.indexOf('_'); if (sep < 0) { return; } int retry = Integer.parseInt(body.substring(0, sep)); String key = body.substring(sep + 1); if (retry >= maxRetry) { log.warn("超过最大重试,丢弃: {}", body); return; } String member = (retry + 1) + "_" + key; long executeAt = System.currentTimeMillis() + delayMs; redisTemplate.opsForZSet().add(DelayQueueConfig.ZSET_KEY, member, executeAt); }
|
在 Listener 的 catch 中调用 rescheduleToZset(body, backoffMs, 3) 即可;若采用「失败不 ACK、仅靠 PEL/XAUTOCLAIM 重试」,则不必写 ZSET。
Listener:读取 body / retry 并处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Override public void onMessage(MapRecord<String, String, String> message) { String messageId = message.getId().getValue(); String body = message.getValue().get(DelayQueueConfig.BODY_FIELD); String retried = message.getValue().get(DelayQueueConfig.RETRY_FIELD); if (!StringUtils.hasLength(body) || !StringUtils.hasLength(retried)) { log.warn("消息缺 body/retry, messageId: {}", messageId); return; } log.info("收到延迟任务 body={}, retry={}, id={}", body, retried, messageId); try { processTask(body); redisTemplate.execute( SCRIPT_ACK_AND_DELETE, List.of(DelayQueueConfig.STREAM_KEY), DelayQueueConfig.GROUP_NAME, messageId); } catch (Exception e) { log.error("处理失败,消息留在 PEL 或按需回写 ZSET, body={}", body, e); throw e; } }
|
小结
- Redis Stream 作为队列:XADD 投递 → 消费者组 XREADGROUP → 业务处理 → XACK(+ 可选 XDEL),建组在监听容器前完成(@DependsOn),ACK 与 XDEL 可用 Lua 原子化。
- 延迟队列:ZSET member 为
{retry}_{key},Dispatcher 用 Lua 原子 ZREM+XADD 搬入 Stream;Consumer 成功则 XACK+XDEL,失败可按 (retry+1)_key 回写 ZSET 或依赖 PEL 重试,并限制最大重试。