- 用法
-
XREAD [ COUNT count ] [ BLOCK milliseconds ] STREAMS key [ key ... ] ID [ ID ... ]
- 始于
- 5.0.0
- ACL 类别
- @stream, @read, @blocking, @slow
- 如果我们想同时从多个键读取数据,此命令可以与多个流一起调用。这是
XREAD
的一个关键特性,因为特别是在使用 BLOCK 阻塞时,能够用单个连接监听多个键是一个至关重要的特性。 - 虽然
XRANGE
返回 ID 范围内的项目,但XREAD
更适合从大于我们目前所见任何其他条目的第一个条目开始消费流。因此,我们传递给XREAD
的是,对于每个流,我们从该流接收到的最后一个元素的 ID。 -
数组回复:一个数组,其中每个元素都是一个由两个元素组成的数组,包含键名和该键报告的条目。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证按照
XADD
添加时的相同顺序报告。 -
空回复:如果给定了 BLOCK 选项并且发生超时,或者没有可以提供服务的流。
从一个或多个流中读取数据,只返回 ID 大于调用者报告的最后接收 ID 的条目。此命令可以选择在项目不可用时阻塞,方式类似于 BRPOP
或 BZPOPMIN
等命令。
请注意,在阅读本页之前,如果您是流的新手,我们建议您阅读我们的流介绍。
非阻塞用法
如果不使用 BLOCK 选项,此命令是同步的,可以认为与 XRANGE
有些关联:它将返回流中的一系列项目,但即使我们只考虑同步用法,它与 XRANGE
相比也有两个根本区别
例如,如果我有两个流 mystream
和 writers
,并且我想从这两个流中读取数据,从它们包含的第一个元素开始,我可以使用以下示例调用 XREAD
。
注意:我们在示例中使用了 COUNT 选项,这样对于每个流,调用将最多返回两个元素。
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
2) 1) 1) 1526984818136-0
2) 1) "duration"
2) "1532"
3) "event-id"
4) "5"
5) "user-id"
6) "7782813"
2) 1) 1526999352406-0
2) 1) "duration"
2) "812"
3) "event-id"
4) "9"
5) "user-id"
6) "388234"
2) 1) "writers"
2) 1) 1) 1526985676425-0
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) 1526985685298-0
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
STREAMS 选项是强制性的,并且必须是最后一个选项,因为此选项采用以下格式的可变长度参数
STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N
因此,我们首先列出键,然后继续列出所有关联的 ID,这些 ID 代表我们从该流接收到的最后一个 ID,这样调用将只为我们提供该流中更大的 ID。
例如,在上面的示例中,我们为流 mystream
接收到的最后一个项目是 ID 1526999352406-0
,而为流 writers
接收到的 ID 是 1526985685298-0
。
要继续迭代这两个流,我将调用
> XREAD COUNT 2 STREAMS mystream writers 1526999352406-0 1526985685298-0
1) 1) "mystream"
2) 1) 1) 1526999626221-0
2) 1) "duration"
2) "911"
3) "event-id"
4) "7"
5) "user-id"
6) "9488232"
2) 1) "writers"
2) 1) 1) 1526985691746-0
2) 1) "name"
2) "Toni"
3) "surname"
4) "Morrison"
2) 1) 1526985712947-0
2) 1) "name"
2) "Agatha"
3) "surname"
4) "Christie"
以此类推。最终,调用将不返回任何项目,而只是一个空数组,这时我们知道我们的流中没有更多可获取的内容(我们必须重试操作,因此此命令也支持阻塞模式)。
不完整的 ID
使用不完整的 ID 是有效的,就像 XRANGE
一样。但在这里,如果 ID 的序列部分缺失,则总是被解释为零,因此命令
> XREAD COUNT 2 STREAMS mystream writers 0 0
完全等同于
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
阻塞等待数据
在其同步形式下,只要有更多可用项目,命令就可以获取新数据。然而,在某些时候,我们必须等待数据生产者使用 XADD
将新条目推送到我们正在消费的流中。为了避免以固定或自适应间隔进行轮询,此命令能够在根据指定的流和 ID 无法返回任何数据时进行阻塞,并在请求的键之一接受数据后自动解除阻塞。
重要的是要理解,此命令会扇出给所有等待相同 ID 范围的客户端,因此每个消费者都将获得数据副本,这与使用阻塞列表弹出操作时的情况不同。
为了阻塞,使用 BLOCK 选项,以及我们希望在超时前阻塞的毫秒数。通常 Valkey 阻塞命令的超时时间以秒为单位,但此命令接受毫秒超时,尽管服务器通常的超时分辨率接近 0.1 秒。这次,在某些用例中可以阻塞更短的时间,如果服务器内部随时间改进,超时分辨率可能会提高。
当传递 BLOCK 命令,但至少在一个传递的流中有数据要返回时,命令会同步执行,就像缺少 BLOCK 选项一样。
这是一个阻塞调用的示例,其中命令稍后返回一个空回复,因为超时已过而没有新数据到达
> XREAD BLOCK 1000 STREAMS mystream 1526999626221-0
(nil)
特殊的 $
ID。
阻塞时,有时我们只想接收从我们阻塞那一刻起通过 XADD
添加到流中的条目。在这种情况下,我们对已添加条目的历史不感兴趣。对于这种情况,我们必须检查流的顶部元素 ID,并在 XREAD
命令行中使用该 ID。这不简洁,并且需要调用其他命令,因此可以改为使用特殊的 $
ID 来表示我们只想要新的内容。
理解这一点**非常重要**:您应该只在第一次调用 XREAD
时使用 $
ID。之后,ID 应该是流中最后报告的项目的 ID,否则您可能会错过中间添加的所有条目。
这是消费者在第一次迭代中只消费新条目时典型的 XREAD
调用示例
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
一旦我们收到一些回复,下一次调用将是这样的
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3
以此类推。
多个客户端阻塞在单个流上如何提供服务
列表或有序集合上的阻塞列表操作具有弹出行为。基本上,元素会从列表或有序集合中移除以返回给客户端。在这种情况下,您希望项目以公平的方式被消费,这取决于客户端阻塞在给定键上的时间。通常 Valkey 在这些用例中采用先进先出(FIFO)原则。
然而请注意,对于流来说这不是问题:当客户端得到服务时,流条目不会从流中移除,因此一旦 XADD
命令向流提供数据,每个等待的客户端都将得到服务。
强烈建议阅读流介绍,以便更深入地了解流的整体行为和语义。
RESP2 回复
以下之一
RESP3 回复
以下之一