XREADGROUP

用法
XREADGROUP GROUP group consumer [ COUNT count ] [ BLOCK milliseconds ] [ NOACK ] STREAMS key [ key ... ] ID [ ID ... ]
复杂度
对于提及的每个流:O(M),其中 M 是返回元素的数量。如果 M 是常数(例如,总是使用 COUNT 请求前10个元素),则可以将其视为 O(1)。另一方面,当 XREADGROUP 阻塞时,为了服务N个因流获取新数据而阻塞的客户端,XADD 将需要 O(N) 的时间。
始于
5.0.0
ACL 类别
@stream, @write, @blocking, @slow

XREADGROUP 命令是 XREAD 命令的一个特殊版本,支持消费者组。在阅读本页之前,您可能需要了解 XREAD 命令,这样才能更好地理解。

此外,如果您刚接触流,我们建议阅读我们的流简介。请务必理解简介中的消费者组概念,这样有助于您更简单地理解此命令的工作原理。

30秒理解消费者组

此命令与普通 XREAD 的区别在于,它支持消费者组。

在没有消费者组的情况下,仅使用 XREAD,所有客户端都会收到流中到达的所有条目。相反,使用 XREADGROUP 和消费者组,可以创建客户端组,这些客户端组消费给定流中到达消息的不同部分。例如,如果流收到新条目 A、B 和 C,并且有两个消费者通过消费者组读取,那么一个客户端将收到消息 A 和 C,另一个客户端将收到消息 B,依此类推。

在消费者组中,给定的消费者(即,仅从流中消费消息的客户端)必须通过唯一的*消费者名称*来识别自己。这只是一个字符串。

消费者组的保证之一是,给定的消费者只能看到已传递给它的消息历史记录,因此一条消息只有一个所有者。然而,有一个名为*消息认领(message claiming)*的特殊功能,允许其他消费者在某个消费者发生不可恢复故障时认领消息。为了实现这种语义,消费者组要求消费者通过 XACK 命令明确确认已成功处理的消息。这是必要的,因为流将为每个消费者组跟踪谁正在处理哪条消息。

以下是如何判断您是否需要使用消费者组

  1. 如果您有一个流和多个客户端,并且希望所有客户端都能收到所有消息,则不需要消费者组。
  2. 如果您有一个流和多个客户端,并且希望流在您的客户端之间进行*分区*或*分片*,以便每个客户端将获得流中到达消息的一个子集,则需要消费者组。

XREAD 和 XREADGROUP 之间的区别

从语法角度来看,这两个命令几乎相同,然而 XREADGROUP 需要一个特殊且强制的选项

GROUP <group-name> <consumer-name>

组名只是与流关联的消费者组的名称。该组使用 XGROUP 命令创建。消费者名称是客户端用于在组内标识自身的字符串。消费者在首次被看到时会在消费者组内自动创建。不同的客户端应选择不同的消费者名称。

当您使用 XREADGROUP 读取时,服务器会记住已将给定消息传递给您:该消息将存储在消费者组中,称为待处理条目列表(PEL),这是一个已传递但尚未确认的消息ID列表。

客户端必须使用 XACK 命令确认消息处理,以便将待处理条目从 PEL 中移除。PEL 可以使用 XPENDING 命令进行检查。

在可靠性不是必需的且偶尔的消息丢失是可接受的情况下,可以使用 NOACK 子命令来避免将消息添加到 PEL。这等同于在读取消息时即对其进行确认。

当使用 XREADGROUP 时,在 **STREAMS** 选项中指定的 ID 可以是以下两种之一

  • 特殊的 > ID,表示消费者只想接收*从未传递给任何其他消费者*的消息。它仅仅意味着,给我新的消息。
  • 任何其他 ID,即 0 或任何其他有效的 ID 或不完整的 ID(仅毫秒时间部分),将返回发送命令的消费者待处理的、ID 大于所提供 ID 的条目。因此,基本上如果 ID 不是 >,则命令将仅允许客户端访问其待处理条目:即已传递给它但尚未确认的消息。请注意,在这种情况下,BLOCKNOACK 都会被忽略。

XREAD 类似,XREADGROUP 命令可以以阻塞方式使用。在这方面没有区别。

当消息传递给消费者时会发生什么?

两件事

  1. 如果消息从未传递给任何人,也就是说,如果我们在谈论一条新消息,则会创建一个 PEL(待处理条目列表)。
  2. 如果消息已经传递给该消费者,并且只是再次重新获取同一条消息,那么*上次传递计数器*将更新为当前时间,并且*传递次数*将增加一。您可以使用 XPENDING 命令访问这些消息属性。

使用示例

通常,您会像这样使用该命令来获取新消息并处理它们。在伪代码中

WHILE true
    entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
    if entries == nil
        puts "Timeout... try again"
        CONTINUE
    end

    FOREACH entries AS stream_entries
        FOREACH stream_entries as message
            process_message(message.id,message.fields)

            # ACK the message as processed
            XACK mystream $GroupName message.id
        END
    END
END

通过这种方式,示例消费者代码将仅获取新消息,处理它们,并通过 XACK 进行确认。然而,上述示例代码不完整,因为它没有处理崩溃后的恢复。如果在处理消息过程中发生崩溃,我们的消息将保留在待处理条目列表中,因此我们可以通过最初向 XREADGROUP 提供 ID 0 并执行相同的循环来访问我们的历史记录。一旦提供 ID 0 且回复是空消息集,我们就知道已经处理并确认了所有待处理消息:我们可以开始使用 > 作为 ID,以获取新消息并重新加入正在处理新事物的消费者。

要查看该命令的实际回复方式,请查阅 XREAD 命令页面。

当待处理消息被删除时会发生什么?

条目可能随时因修剪或显式调用 XDEL 而从流中删除。根据设计,Valkey 不会阻止删除流 PEL 中存在的条目。发生这种情况时,PEL 会保留已删除条目的 ID,但实际的条目负载不再可用。因此,当读取此类 PEL 条目时,Valkey 将返回一个空值来代替其各自的数据。

示例

> XADD mystream 1 myfield mydata
"1-0"
> XGROUP CREATE mystream mygroup 0
OK
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1-0"
         2) 1) "myfield"
            2) "mydata"
> XDEL mystream 1-0
(integer) 1
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1-0"
         2) (nil)

强烈建议阅读流简介,以便更深入地了解流的整体行为和语义。

RESP2 回复

以下之一

  • 数组回复:一个数组,其中每个元素都是一个由两个元素组成的数组,包含键名和该键报告的条目。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证以与 XADD 添加时相同的顺序报告。

  • 空回复:如果给定了 *BLOCK* 选项且发生超时,或者没有可服务的流。

RESP3 回复

以下之一

  • 映射回复:一个键值元素映射,其中每个元素由键名和该键报告的条目组成。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证以与 XADD 添加时相同的顺序报告。

  • 空回复:如果给定了 *BLOCK* 选项且发生超时,或者没有可服务的流。