XREAD

用法
XREAD [ COUNT count ] [ BLOCK milliseconds ] STREAMS key [ key ... ] ID [ ID ... ]
始于
5.0.0
ACL 类别
@stream, @read, @blocking, @slow

从一个或多个流中读取数据,只返回 ID 大于调用者报告的最后接收 ID 的条目。此命令可以选择在项目不可用时阻塞,方式类似于 BRPOPBZPOPMIN 等命令。

请注意,在阅读本页之前,如果您是流的新手,我们建议您阅读我们的流介绍

非阻塞用法

如果不使用 BLOCK 选项,此命令是同步的,可以认为与 XRANGE 有些关联:它将返回流中的一系列项目,但即使我们只考虑同步用法,它与 XRANGE 相比也有两个根本区别

  • 如果我们想同时从多个键读取数据,此命令可以与多个流一起调用。这是 XREAD 的一个关键特性,因为特别是在使用 BLOCK 阻塞时,能够用单个连接监听多个键是一个至关重要的特性。
  • 虽然 XRANGE 返回 ID 范围内的项目,但 XREAD 更适合从大于我们目前所见任何其他条目的第一个条目开始消费流。因此,我们传递给 XREAD 的是,对于每个流,我们从该流接收到的最后一个元素的 ID。

例如,如果我有两个流 mystreamwriters,并且我想从这两个流中读取数据,从它们包含的第一个元素开始,我可以使用以下示例调用 XREAD

注意:我们在示例中使用了 COUNT 选项,这样对于每个流,调用将最多返回两个元素。

> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

STREAMS 选项是强制性的,并且必须是最后一个选项,因为此选项采用以下格式的可变长度参数

STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N

因此,我们首先列出键,然后继续列出所有关联的 ID,这些 ID 代表我们从该流接收到的最后一个 ID,这样调用将只为我们提供该流中更大的 ID。

例如,在上面的示例中,我们为流 mystream 接收到的最后一个项目是 ID 1526999352406-0,而为流 writers 接收到的 ID 是 1526985685298-0

要继续迭代这两个流,我将调用

> XREAD COUNT 2 STREAMS mystream writers 1526999352406-0 1526985685298-0
1) 1) "mystream"
   2) 1) 1) 1526999626221-0
         2) 1) "duration"
            2) "911"
            3) "event-id"
            4) "7"
            5) "user-id"
            6) "9488232"
2) 1) "writers"
   2) 1) 1) 1526985691746-0
         2) 1) "name"
            2) "Toni"
            3) "surname"
            4) "Morrison"
      2) 1) 1526985712947-0
         2) 1) "name"
            2) "Agatha"
            3) "surname"
            4) "Christie"

以此类推。最终,调用将不返回任何项目,而只是一个空数组,这时我们知道我们的流中没有更多可获取的内容(我们必须重试操作,因此此命令也支持阻塞模式)。

不完整的 ID

使用不完整的 ID 是有效的,就像 XRANGE 一样。但在这里,如果 ID 的序列部分缺失,则总是被解释为零,因此命令

> XREAD COUNT 2 STREAMS mystream writers 0 0

完全等同于

> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0

阻塞等待数据

在其同步形式下,只要有更多可用项目,命令就可以获取新数据。然而,在某些时候,我们必须等待数据生产者使用 XADD 将新条目推送到我们正在消费的流中。为了避免以固定或自适应间隔进行轮询,此命令能够在根据指定的流和 ID 无法返回任何数据时进行阻塞,并在请求的键之一接受数据后自动解除阻塞。

重要的是要理解,此命令会扇出给所有等待相同 ID 范围的客户端,因此每个消费者都将获得数据副本,这与使用阻塞列表弹出操作时的情况不同。

为了阻塞,使用 BLOCK 选项,以及我们希望在超时前阻塞的毫秒数。通常 Valkey 阻塞命令的超时时间以秒为单位,但此命令接受毫秒超时,尽管服务器通常的超时分辨率接近 0.1 秒。这次,在某些用例中可以阻塞更短的时间,如果服务器内部随时间改进,超时分辨率可能会提高。

当传递 BLOCK 命令,但至少在一个传递的流中有数据要返回时,命令会同步执行,就像缺少 BLOCK 选项一样

这是一个阻塞调用的示例,其中命令稍后返回一个空回复,因为超时已过而没有新数据到达

> XREAD BLOCK 1000 STREAMS mystream 1526999626221-0
(nil)

特殊的 $ ID。

阻塞时,有时我们只想接收从我们阻塞那一刻起通过 XADD 添加到流中的条目。在这种情况下,我们对已添加条目的历史不感兴趣。对于这种情况,我们必须检查流的顶部元素 ID,并在 XREAD 命令行中使用该 ID。这不简洁,并且需要调用其他命令,因此可以改为使用特殊的 $ ID 来表示我们只想要新的内容。

理解这一点**非常重要**:您应该只在第一次调用 XREAD 时使用 $ ID。之后,ID 应该是流中最后报告的项目的 ID,否则您可能会错过中间添加的所有条目。

这是消费者在第一次迭代中只消费新条目时典型的 XREAD 调用示例

> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $

一旦我们收到一些回复,下一次调用将是这样的

> XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3

以此类推。

多个客户端阻塞在单个流上如何提供服务

列表或有序集合上的阻塞列表操作具有弹出行为。基本上,元素会从列表或有序集合中移除以返回给客户端。在这种情况下,您希望项目以公平的方式被消费,这取决于客户端阻塞在给定键上的时间。通常 Valkey 在这些用例中采用先进先出(FIFO)原则。

然而请注意,对于流来说这不是问题:当客户端得到服务时,流条目不会从流中移除,因此一旦 XADD 命令向流提供数据,每个等待的客户端都将得到服务。

强烈建议阅读流介绍,以便更深入地了解流的整体行为和语义。

RESP2 回复

以下之一

  • 数组回复:一个数组,其中每个元素都是一个由两个元素组成的数组,包含键名和该键报告的条目。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证按照 XADD 添加时的相同顺序报告。

  • 空回复:如果给定了 BLOCK 选项并且发生超时,或者没有可以提供服务的流。

RESP3 回复

以下之一

  • 映射回复:一个键值元素映射,其中每个元素由键名和该键报告的条目组成。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证按照 XADD 添加时的相同顺序报告。

  • 空值回复:如果给定了 BLOCK 选项并且发生超时,或者没有可以提供服务的流。