流是一种数据结构,其作用类似于仅追加日志,但也实现了多项操作来克服典型仅追加日志的一些限制。这些操作包括 O(1) 时间的随机访问和复杂的消费策略,例如消费者组。您可以使用流来记录并同时实时联合事件。流用例的示例包括
- 事件溯源(例如,跟踪用户行为、点击等)
- 传感器监控(例如,现场设备的读数)
- 通知(例如,将每个用户的通知记录存储在单独的流中)
Valkey 为每个流条目生成一个唯一的 ID。您可以使用这些 ID 稍后检索其关联的条目,或者读取和处理流中所有后续条目。请注意,由于这些 ID 与时间相关,此处显示的 ID 可能会有所不同,并将与您在自己的 Valkey 实例中看到的 ID 不同。
流支持多种修剪策略(以防止流无限增长)和不止一种消费策略(参见 XREAD
、XREADGROUP
和 XRANGE
)。
基本命令
XADD
向流中添加一个新条目。XREAD
从给定位置开始向前读取一个或多个条目。XRANGE
返回两个提供的条目 ID 之间的一系列条目。XLEN
返回流的长度。
参见流命令的完整列表。
示例
- 当我们的赛车手通过检查点时,我们为每个赛车手添加一个流条目,其中包括赛车手的姓名、速度、位置和位置 ID
127.0.0.1:6379> XADD race:france * rider Castilla speed 30.2 position 1 location_id 1
"1692632086370-0"
127.0.0.1:6379> XADD race:france * rider Norem speed 28.8 position 3 location_id 1
"1692632094485-0"
127.0.0.1:6379> XADD race:france * rider Prickett speed 29.7 position 2 location_id 1
"1692632102976-0"
- 从 ID
1692632086370-0
开始读取两个流条目
127.0.0.1:6379> XRANGE race:france 1692632086370-0 + COUNT 2
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
- 从流末尾开始读取最多 100 个新流条目,如果没有条目写入,则阻塞最长 300 毫秒
127.0.0.1:6379> XREAD COUNT 100 BLOCK 300 STREAMS race:france $
(nil)
性能
向流中添加条目的时间复杂度是 O(1)。访问任何单个条目的时间复杂度是 O(n),其中 n 是 ID 的长度。由于流 ID 通常很短且长度固定,这实际上可以简化为常数时间查找。有关原因的详细信息,请注意流是作为基数树实现的。
简单来说,流提供了高效的插入和读取。有关详细信息,请参阅每个命令的时间复杂度。
流基础知识
流是仅追加的数据结构。基本写入命令,称为 XADD
,将一个新条目追加到指定的流中。
每个流条目包含一个或多个字段-值对,有点像字典或哈希
127.0.0.1:6379> XADD race:france * rider Castilla speed 29.9 position 1 location_id 2
"1692632147973-0"
上面对 XADD
命令的调用将条目 rider: Castilla, speed: 29.9, position: 1, location_id: 2
添加到键 race:france
处的流中,使用自动生成的条目 ID,即命令返回的 ID,具体是 1692632147973-0
。它将键名 race:france
作为第一个参数,第二个参数是标识流中每个条目的条目 ID。但是,在这种情况下,我们传递了 *
,因为我们希望服务器为我们生成一个新的 ID。每个新的 ID 都将单调递增,所以更简单地说,每个新添加的条目都将拥有比所有过去条目更高的 ID。服务器自动生成 ID 几乎总是您想要的,明确指定 ID 的原因非常罕见。我们稍后将详细讨论这一点。每个流条目都有一个 ID 这一事实是与日志文件的另一个相似之处,日志文件中的行号或文件中字节偏移量可以用来标识给定条目。回到我们的 XADD
示例,在键名和 ID 之后,接下来的参数是构成我们流条目的字段-值对。
只需使用 XLEN
命令即可获取流中的项目数
127.0.0.1:6379> XLEN race:france
(integer) 4
条目 ID
由 XADD
命令返回的、唯一标识给定流中每个条目的条目 ID 由两部分组成
<millisecondsTime>-<sequenceNumber>
毫秒时间部分实际上是生成流 ID 的本地 Valkey 节点中的本地时间,但是如果当前毫秒时间小于前一个条目时间,则使用前一个条目时间代替,因此即使时钟回跳,单调递增的 ID 属性仍然保持不变。序列号用于在同一毫秒内创建的条目。由于序列号是 64 位宽,所以在实践中,同一毫秒内可以生成的条目数量没有限制。
这种 ID 格式乍一看可能很奇怪,读者可能会想为什么时间是 ID 的一部分。原因在于流支持按 ID 的范围查询。由于 ID 与条目生成的时间相关,这使得能够基本免费地查询时间范围。我们很快就会在介绍 XRANGE
命令时看到这一点。
如果由于某种原因用户需要与时间无关但实际与另一个外部系统 ID 相关联的增量 ID,如前所述,XADD
命令可以接受显式 ID 而不是触发自动生成的 *
通配符 ID,如下例所示
127.0.0.1:6379> XADD race:usa 0-1 racer Castilla
0-1
127.0.0.1:6379> XADD race:usa 0-2 racer Norem
0-2
请注意,在这种情况下,最小 ID 是 0-1,并且命令不接受等于或小于先前 ID 的 ID
127.0.0.1:6379> XADD race:usa 0-1 racer Prickett
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
如果您正在运行 Redis OSS 7 或更高版本,您还可以提供仅包含毫秒部分的显式 ID。在这种情况下,ID 的序列部分将自动生成。要执行此操作,请使用以下语法
127.0.0.1:6379> XADD race:usa 0-* racer Prickett
0-3
从流中获取数据
现在我们终于可以通过 XADD
向流中追加条目了。然而,向流中追加数据是相当明显的,但查询流以提取数据的方式却不那么明显。如果我们继续以日志文件类比,一种显而易见的方法是模仿我们通常使用 Unix 命令 tail -f
所做的事情,即我们可以开始监听以获取追加到流中的新消息。请注意,与 Valkey 的阻塞列表操作不同,在阻塞列表操作中,给定元素将到达单个客户端,该客户端以“弹出式”操作(如 BLPOP
)进行阻塞,而流中我们希望多个消费者能够看到追加到流中的新消息(就像许多 tail -f
进程可以看到添加到日志中的内容一样)。用传统术语来说,我们希望流能够将消息“扇出”到多个客户端。
然而,这只是一种潜在的访问模式。我们也可以用一种截然不同的方式看待流:不是作为一个消息系统,而是一个时间序列存储。在这种情况下,获取新追加的消息可能也很有用,但另一种自然的查询模式是按时间范围获取消息,或者使用游标迭代消息以增量检查所有历史记录。这无疑是另一种有用的访问模式。
最后,如果我们从消费者的角度看流,我们可能希望以另一种方式访问流,即作为可以分配给多个处理这些消息的消费者的消息流,以便消费者组只能看到单个流中到达的消息的子集。通过这种方式,可以跨不同的消费者扩展消息处理,而无需单个消费者处理所有消息:每个消费者将只获得不同的消息来处理。这基本上是 Kafka (TM) 对消费者组所做的事情。通过消费者组读取消息是另一种有趣的从流中读取消息的模式。
流支持通过不同的命令实现上述所有三种查询模式。接下来的部分将展示所有这些命令,从最简单直接的范围查询开始。
按范围查询:XRANGE 和 XREVRANGE
要按范围查询流,我们只需要指定两个 ID,start 和 end。返回的范围将包含 ID 为 start 或 end 的元素,因此该范围是包含的。两个特殊 ID -
和 +
分别表示可能的最小 ID 和最大 ID。
127.0.0.1:6379> XRANGE race:france - +
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
3) 1) "1692632102976-0"
2) 1) "rider"
2) "Prickett"
3) "speed"
4) "29.7"
5) "position"
6) "2"
7) "location_id"
8) "1"
4) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"
每个返回的条目都是一个包含两个项的数组:ID 和字段-值对列表。我们已经说过,条目 ID 与时间有关,因为 -
字符左侧的部分是创建流条目时本地节点的 Unix 时间(毫秒)(但是请注意,流是使用完全指定的 XADD
命令复制的,因此副本将具有与主节点相同的 ID)。这意味着我可以使用 XRANGE
查询一个时间范围。但是,为了做到这一点,我可能需要省略 ID 的序列部分:如果省略,在范围的开始处,它将假定为 0,而在结束处,它将假定为可用的最大序列号。通过这种方式,仅使用两个毫秒级的 Unix 时间进行查询,我们可以以包含的方式获取在该时间范围内生成的所有条目。例如,如果我想查询一个两毫秒的周期,我可以使用
127.0.0.1:6379> XRANGE race:france 1692632086369 1692632086371
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
在此范围内我只有一个条目。然而在真实数据集中,我可能会查询数小时的范围,或者在短短两毫秒内可能有许多项目,返回的结果可能非常大。因此,XRANGE
在末尾支持一个可选的 COUNT 选项。通过指定一个计数,我只能获取前 N 个项目。如果我想要更多,我可以获取返回的最后一个 ID,将其序列部分加一,然后再次查询。让我们在下面的示例中看看。假设流 race:france
已填充了 4 个项目。要开始我的迭代,每次命令获取 2 个项目,我从完整的范围开始,但计数为 2。
127.0.0.1:6379> XRANGE race:france - + COUNT 2
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
要继续迭代接下来的两个项目,我必须选择返回的最后一个 ID,即 1692632094485-0
,并在其前面加上 (
。由此产生的排他范围区间,在本例中为 (1692632094485-0
,现在可以用作下一次 XRANGE
调用的新 start 参数。
127.0.0.1:6379> XRANGE race:france (1692632094485-0 + COUNT 2
1) 1) "1692632102976-0"
2) 1) "rider"
2) "Prickett"
3) "speed"
4) "29.7"
5) "position"
6) "2"
7) "location_id"
8) "1"
2) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"
现在我们已经从一个只有 4 个条目的流中检索了 4 个条目,如果我们尝试检索更多条目,我们将得到一个空数组
127.0.0.1:6379> XRANGE race:france (1692632147973-0 + COUNT 2
(empty array)
由于 XRANGE
的复杂度在查找时是 O(log(N)),然后返回 M 个元素是 O(M),所以当计数较小时,该命令具有对数时间复杂度,这意味着迭代的每一步都很快。因此 XRANGE
也是事实上的流迭代器,并且不需要 XSCAN 命令。
命令 XREVRANGE
等同于 XRANGE
,但以相反顺序返回元素,因此 XREVRANGE
的一个实际用途是检查流中的最后一个项目是什么
127.0.0.1:6379> XREVRANGE race:france + - COUNT 1
1) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"
请注意,XREVRANGE
命令的 start 和 stop 参数的顺序是相反的。
使用 XREAD 监听新项目
当我们不想通过流中的范围访问项目时,通常我们想做的是订阅流中到达的新项目。这个概念可能与 Valkey Pub/Sub 相关,在那里您订阅一个通道,或者与 Valkey 阻塞列表相关,在那里您等待一个键获取新元素以获取,但是消费流的方式存在根本区别
- 一个流可以有多个客户端(消费者)等待数据。默认情况下,每个新项目都将传递给等待给定流中数据的每个消费者。这种行为与阻塞列表不同,在阻塞列表中,每个消费者将获得一个不同的元素。然而,扇出到多个消费者的能力类似于 Pub/Sub。
- 虽然在 Pub/Sub 中,消息是“即发即弃”且从不存储,并且在使用阻塞列表时,当客户端收到消息时,它会从列表中被“弹出”(实际移除),但流的工作方式从根本上不同。所有消息都无限期地追加到流中(除非用户明确要求删除条目):不同的消费者会通过记住收到的最后一条消息的 ID 来了解从其角度来看什么是新消息。
- 流消费者组提供了 Pub/Sub 或阻塞列表无法实现的控制级别,具有同一流的不同组,已处理项目的明确确认,检查待处理项目的能力,未处理消息的认领,以及每个客户端连贯的历史可见性,即只能看到其私有的过去消息历史。
提供监听流中新消息能力的命令称为 XREAD
。它比 XRANGE
稍微复杂一些,所以我们将从简单的形式开始展示,稍后将提供完整的命令布局。
127.0.0.1:6379> XREAD COUNT 2 STREAMS race:france 0
1) 1) "race:france"
2) 1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
以上是 XREAD
的非阻塞形式。请注意,COUNT 选项不是强制性的,实际上,命令唯一强制性的选项是 STREAMS 选项,它指定了一个键列表以及调用消费者为每个流已经看到的最大 ID,以便该命令只向客户端提供 ID 大于我们指定的 ID 的消息。
在上面的命令中,我们写了 STREAMS race:france 0
,所以我们想要流 race:france
中所有 ID 大于 0-0
的消息。正如你在上面的例子中看到的,命令返回键名,因为实际上可以用多个键调用此命令,以便同时从不同的流中读取。例如,我可以写:STREAMS race:france race:italy 0 0
。请注意,在 STREAMS 选项之后,我们需要提供键名,然后是 ID。因此,STREAMS 选项必须始终是最后一个选项。任何其他选项必须在 STREAMS 选项之前。
除了 XREAD
可以一次访问多个流,并且我们能够指定我们拥有的最后一个 ID 以只获取新消息之外,在这种简单形式下,该命令与 XRANGE
的功能并没有太大差异。然而,有趣的部分是我们可以通过指定 BLOCK 参数轻松地将 XREAD
转换为阻塞命令
> XREAD BLOCK 0 STREAMS race:france $
请注意,在上面的示例中,除了删除 COUNT 之外,我还指定了新的 BLOCK 选项,超时时间为 0 毫秒(这意味着永不超时)。此外,我没有为流 race:france
传递正常的 ID,而是传递了特殊 ID $
。这个特殊 ID 意味着 XREAD
应该将流 race:france
中已存储的最大 ID 作为最后一个 ID,这样我们将只接收新消息,从我们开始监听的时间开始。这在某种程度上类似于 Unix 命令 tail -f
。
请注意,使用 BLOCK 选项时,我们不必使用特殊 ID $
。我们可以使用任何有效 ID。如果命令能够立即处理我们的请求而无需阻塞,它就会这样做,否则它将阻塞。通常,如果我们想从新条目开始消费流,我们从 ID $
开始,之后我们继续使用收到的最后一条消息的 ID 进行下一次调用,依此类推。
XREAD
的阻塞形式也能够监听多个流,只需指定多个键名即可。如果请求能够同步处理,因为至少有一个流的元素大于我们指定的相应 ID,则它将返回结果。否则,命令将阻塞并返回第一个获得新数据(根据指定的 ID)的流的项目。
与阻塞列表操作类似,阻塞流读取从等待数据的客户端的角度来看是公平的,因为语义是 FIFO(先进先出)式的。第一个为给定流阻塞的客户端将是当新项目可用时第一个被解除阻塞的客户端。
XREAD
除了 COUNT 和 BLOCK 之外没有其他选项,因此它是一个相当基本的命令,其特定目的是将消费者连接到一个或多个流。使用消费者组 API 可以获得更强大的流消费功能,然而,通过消费者组读取是通过一个名为 XREADGROUP
的不同命令实现的,本指南的下一节将介绍该命令。
消费者组
当手头的任务是从不同的客户端消费同一个流时,XREAD
已经提供了一种“扇出”到 N 个客户端的方式,甚至可能利用副本以提供更多的读取可伸缩性。然而,在某些问题中,我们想做的不是向许多客户端提供相同的消息流,而是从同一个流中向许多客户端提供不同的消息子集。一个明显有用例是处理缓慢的消息:拥有 N 个不同的工作器接收流的不同部分的能力使我们能够扩展消息处理,通过将不同的消息路由到准备好进行更多工作的不同工作器。
实际操作中,如果我们想象有三个消费者 C1、C2、C3,以及一个包含消息 1、2、3、4、5、6、7 的流,那么我们希望按照下图所示的方式提供消息
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
为了实现这一点,Valkey 使用了一种称为消费者组的概念。理解 Valkey 消费者组在实现层面上与 Kafka (TM) 消费者组无关是非常重要的。然而,它们的功能相似,所以我决定保留 Kafka (TM) 的术语,因为它最初普及了这一思想。
消费者组就像一个从流中获取数据的伪消费者,实际上为多个消费者提供服务,并提供某些保证
- 每条消息都提供给不同的消费者,因此不可能将同一条消息传递给多个消费者。
- 在消费者组内,消费者通过名称进行标识,这是一个区分大小写的字符串,由实现消费者的客户端选择。这意味着即使在断开连接后,流消费者组也会保留所有状态,因为客户端会再次声称是同一个消费者。但是,这也意味着客户端有责任提供唯一的标识符。
- 每个消费者组都有从未消费过的第一个 ID的概念,因此,当消费者请求新消息时,它只能提供以前未传递的消息。
- 然而,消费一条消息需要使用特定命令进行显式确认。Valkey 将确认解释为:此消息已正确处理,因此可以从消费者组中驱逐。
- 消费者组会跟踪所有当前待处理的消息,即已传递给消费者组中某个消费者但尚未确认为已处理的消息。由于此功能,当访问流的消息历史记录时,每个消费者将只看到已传递给它的消息。
在某种程度上,消费者组可以被想象成关于流的某种状态量
+----------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
+----------------------------------------+
如果你从这个角度来看,就很容易理解消费者组能做什么,它如何能够只向消费者提供他们的待处理消息历史,以及消费者请求新消息时将只获得大于 last_delivered_id
的消息 ID。同时,如果你将消费者组视为流的辅助数据结构,那么一个流可以有多个消费者组,它们拥有不同的消费者集,这是显而易见的。实际上,同一个流甚至可以有客户端通过 XREAD
而不使用消费者组进行读取,以及客户端通过 XREADGROUP
在不同的消费者组中进行读取。
现在是时候深入了解消费者组的基本命令了。它们如下
XGROUP
用于创建、销毁和管理消费者组。XREADGROUP
用于通过消费者组从流中读取。XACK
是允许消费者将待处理消息标记为已正确处理的命令。
创建消费者组
假设我有一个名为 race:france
的流类型键已经存在,为了创建一个消费者组,我只需要执行以下操作
127.0.0.1:6379> XGROUP CREATE race:france france_riders $
OK
正如您在上面的命令中看到的,在创建消费者组时,我们必须指定一个 ID,在此示例中仅为 $
。这是必需的,因为消费者组(除其他状态外)必须知道在第一个消费者连接时要提供哪条消息,也就是说,当组刚创建时,最后一条消息 ID 是什么。如果我们像我们一样提供 $
,那么从现在起流中到达的新消息将只提供给组中的消费者。如果我们指定 0
,则消费者组将从流历史中的所有消息开始消费。当然,您可以指定任何其他有效的 ID。您知道的是,消费者组将开始传递大于您指定的 ID 的消息。因为 $
意味着流中当前最大的 ID,所以指定 $
将产生只消费新消息的效果。
如果流不存在,XGROUP CREATE
还支持使用可选的 MKSTREAM
子命令作为最后一个参数来自动创建流
127.0.0.1:6379> XGROUP CREATE race:italy italy_riders $ MKSTREAM
OK
现在消费者组已创建,我们可以立即尝试使用 XREADGROUP
命令通过消费者组读取消息。我们将从名为 Alice 和 Bob 的消费者那里读取,看看系统将如何向 Alice 或 Bob 返回不同的消息。
XREADGROUP
与 XREAD
非常相似,并提供相同的 BLOCK 选项,否则它是一个同步命令。但是,有一个强制性选项必须始终指定,即 GROUP,它有两个参数:消费者组的名称和尝试读取的消费者的名称。COUNT 选项也受支持,并且与 XREAD
中的选项相同。
我们将向 race:italy
流添加车手并尝试使用消费者组读取内容:注意:这里的 rider 是字段名,name 是关联的值。请记住,流项目是小型字典。
127.0.0.1:6379> XADD race:italy * rider Castilla
"1692632639151-0"
127.0.0.1:6379> XADD race:italy * rider Royce
"1692632647899-0"
127.0.0.1:6379> XADD race:italy * rider Sam-Bodden
"1692632662819-0"
127.0.0.1:6379> XADD race:italy * rider Prickett
"1692632670501-0"
127.0.0.1:6379> XADD race:italy * rider Norem
"1692632678249-0"
127.0.0.1:6379> XREADGROUP GROUP italy_riders Alice COUNT 1 STREAMS race:italy >
1) 1) "race:italy"
2) 1) 1) "1692632639151-0"
2) 1) "rider"
2) "Castilla"
XREADGROUP
的回复就像 XREAD
的回复一样。但是请注意上面提供的 GROUP <group-name> <consumer-name>
。它声明我希望使用消费者组 mygroup
从流中读取,并且我是消费者 Alice
。每次消费者使用消费者组执行操作时,它都必须指定其名称,唯一地标识此消费者在组内。
上面的命令行中还有一个非常重要的细节,在强制性的 STREAMS 选项之后,为键 race:italy
请求的 ID 是特殊 ID >
。这个特殊 ID 仅在消费者组的上下文中有效,它表示:迄今为止从未传递给其他消费者的消息。
这几乎总是您想要的,但是也可以指定一个真实的 ID,例如 0
或任何其他有效 ID,在这种情况下,我们从 XREADGROUP
请求只向我们提供待处理消息的历史记录,并且在这种情况下,将永远不会在组中看到新消息。所以基本上 XREADGROUP
根据我们指定的 ID 具有以下行为
- 如果 ID 是特殊 ID
>
,则命令将只返回迄今为止从未传递给其他消费者的新消息,并且作为副作用,将更新消费者组的last ID。 - 如果 ID 是任何其他有效的数字 ID,那么命令将允许我们访问我们的待处理消息历史记录。也就是说,传递给此指定消费者(由提供的名称标识)且迄今为止从未通过
XACK
确认的消息集。
我们可以立即通过指定 ID 为 0 来测试此行为,不带任何 COUNT 选项:我们将只看到唯一待处理的消息,即关于 Castilla 的消息
127.0.0.1:6379> XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0
1) 1) "race:italy"
2) 1) 1) "1692632639151-0"
2) 1) "rider"
2) "Castilla"
然而,如果我们确认消息已处理,它将不再是待处理消息历史的一部分,因此系统将不再报告任何内容。
127.0.0.1:6379> XACK race:italy italy_riders 1692632639151-0
(integer) 1
127.0.0.1:6379> XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0
1) 1) "race:italy"
2) (empty array)
别担心您还不知道 XACK
的工作原理,这里的想法是已处理的消息不再是我们可以访问的历史记录的一部分。
现在轮到 Bob 来读点东西了
127.0.0.1:6379> XREADGROUP GROUP italy_riders Bob COUNT 2 STREAMS race:italy >
1) 1) "race:italy"
2) 1) 1) "1692632647899-0"
2) 1) "rider"
2) "Royce"
2) 1) "1692632662819-0"
2) 1) "rider"
2) "Sam-Bodden"
Bob 最多请求了两条消息,并且通过相同的组 mygroup
进行读取。因此,Valkey 仅报告新消息。正如您所看到的,“Castilla”消息没有被传递,因为它已经被传递给 Alice,所以 Bob 得到了 Royce 和 Sam-Bodden 等。
通过这种方式,Alice、Bob 和组中的任何其他消费者都能够从同一流中读取不同的消息,读取他们尚未处理的消息的历史,或将消息标记为已处理。这允许创建不同的拓扑和语义来从流中消费消息。
有几点需要记住
- 消费者在第一次被提及时会自动创建,无需显式创建。
- 即使使用
XREADGROUP
,您也可以同时从多个键读取,但要实现这一点,您需要在每个流中创建同名的消费者组。这不是一个常见的需求,但值得一提的是,该功能在技术上是可用的。 XREADGROUP
是一个写入命令,因为即使它从流中读取,消费者组也会因读取而作为副作用被修改,因此只能在主实例上调用。
一个使用消费者组实现的消费者示例,用 Ruby 语言编写,如下所示。即使不了解 Ruby,这段 Ruby 代码也旨在让任何有经验的程序员都能读懂
require 'redis'
if ARGV.length == 0
puts "Please specify a consumer name"
exit 1
end
ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new
def process_message(id,msg)
puts "[#{ConsumerName}] #{id} = #{msg.inspect]: "
end
$lastid = '0-0'
puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
# Pick the ID based on the iteration: the first time we want to
# read our pending messages, in case we crashed and are recovering.
# Once we consumed our history, we can start getting new messages.
if check_backlog
myid = $lastid
else
myid = '>'
end
items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)
if items == nil
puts "Timeout!"
next
end
# If we receive an empty reply, it means we were consuming our history
# and that the history is now empty. Let's start to consume new messages.
check_backlog = false if items[0][1].length == 0
items[0][1].each{|i|
id,fields = i
# Process the message
process_message(id,fields)
# Acknowledge the message as processed
r.xack(:my_stream_key,GroupName,id)
$lastid = id
}
end
正如你所看到的,这里的想法是首先消费历史记录,也就是我们的待处理消息列表。这很有用,因为消费者可能之前崩溃了,所以在重新启动时,我们希望重新读取已传递给我们但未被确认的消息。请注意,我们可能会多次或一次处理一条消息(至少在消费者故障的情况下是这样,但也涉及 Valkey 持久性和复制的限制,请参阅关于此主题的特定部分)。
一旦历史记录被消费,并且我们得到一个空的消息列表,我们就可以切换到使用 >
特殊 ID 来消费新消息。
从永久故障中恢复
上面的示例允许我们编写参与同一消费者组的消费者,每个消费者处理一部分消息,并在从故障中恢复时重新读取仅传递给它们本身的待处理消息。然而,在现实世界中,消费者可能会永久性失败,并且永不恢复。在消费者因任何原因停止后,那些永不恢复的消费者的待处理消息会发生什么?
Valkey 消费者组在这种情况下提供了一个功能,用于认领给定消费者的待处理消息,以便这些消息将更改所有权并重新分配给不同的消费者。该功能非常明确。消费者必须检查待处理消息列表,并且必须使用特殊命令认领特定消息,否则服务器将使消息永远待处理并分配给旧消费者。通过这种方式,不同的应用程序可以选择是否使用此功能以及如何精确使用它。
此过程的第一步只是一个命令,它提供了消费者组中待处理条目的可观察性,名为 XPENDING
。这是一个只读命令,调用它始终是安全的,并且不会更改任何消息的所有权。以最简单的形式调用时,该命令接受两个参数,即流的名称和消费者组的名称。
127.0.0.1:6379> XPENDING race:italy italy_riders
1) (integer) 2
2) "1692632647899-0"
3) "1692632662819-0"
4) 1) 1) "Bob"
2) "2"
以这种方式调用时,命令会输出消费者组中待处理消息的总数(本例中为两条),待处理消息中的最低和最高消息 ID,最后是一个消费者列表以及它们各自拥有的待处理消息数量。我们只有 Bob 有两条待处理消息,因为 Alice 请求的那条消息已经使用 XACK
确认。
我们可以通过向 XPENDING
提供更多参数来请求更多信息,因为完整的命令签名如下
XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]
通过提供开始和结束 ID(可以是 XRANGE
中的 -
和 +
)以及一个计数来控制命令返回的信息量,我们可以了解更多关于待处理消息的信息。可选的最后一个参数,消费者名称,用于如果我们要将输出限制为仅针对给定消费者待处理的消息,但在下面的示例中不会使用此功能。
127.0.0.1:6379> XPENDING race:italy italy_riders - + 10
1) 1) "1692632647899-0"
2) "Bob"
3) (integer) 74642
4) (integer) 1
2) 1) "1692632662819-0"
2) "Bob"
3) (integer) 74642
4) (integer) 1
现在我们有了每条消息的详细信息:ID、消费者名称、以毫秒为单位的空闲时间(即自上次将消息传递给某个消费者以来已过去多少毫秒),最后是给定消息被传递的次数。Bob 有两条消息,它们空闲了 60000+ 毫秒,大约一分钟。
请注意,没有人阻止我们通过简单地使用 XRANGE
来检查第一条消息的内容。
127.0.0.1:6379> XRANGE race:italy 1692632647899-0 1692632647899-0
1) 1) "1692632647899-0"
2) 1) "rider"
2) "Royce"
我们只需在参数中重复相同的 ID 两次。现在我们有了些想法,Alice 可能决定在消息处理停滞 1 分钟后,Bob 可能不会很快恢复,是时候认领这些消息并代替 Bob 恢复处理了。为此,我们使用 XCLAIM
命令。
这个命令在完整形式下非常复杂且选项众多,因为它用于复制消费者组的更改,但我们只使用通常需要的参数。在这种情况下,它很简单,如下所示
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
基本上我们是说,对于这个特定的键和组,我希望指定的邮件 ID 更改所有权,并被分配给指定的消费者名称 <consumer>
。但是,我们还提供了一个最小空闲时间,这样操作只有在所述邮件的空闲时间大于指定的空闲时间时才有效。这很有用,因为可能有两个客户端同时尝试认领一个邮件
Client 1: XCLAIM race:italy italy_riders Alice 60000 1692632647899-0
Client 2: XCLAIM race:italy italy_riders Lora 60000 1692632647899-0
然而,作为副作用,认领消息会重置其空闲时间并增加其交付计数器,因此第二个客户端将无法认领它。通过这种方式,我们避免了消息的重复处理(即使在一般情况下,您无法获得精确一次处理)。
这是命令执行的结果
127.0.0.1:6379> XCLAIM race:italy italy_riders Alice 60000 1692632647899-0
1) 1) "1692632647899-0"
2) 1) "rider"
2) "Royce"
该消息已成功被 Alice 认领,她现在可以处理该消息并确认,即使原始消费者未恢复,也能继续推进。
从上面的例子中可以清楚地看出,作为成功认领给定消息的副作用,XCLAIM
命令也返回了该消息。然而,这不是强制性的。可以使用 JUSTID 选项,以便只返回成功认领的消息的 ID。如果您想减少客户端和服务器之间使用的带宽(以及命令的性能),并且您对消息不感兴趣,因为您的消费者是以定期重新扫描待处理消息历史的方式实现的,那么这很有用。
认领也可以由一个独立的进程实现:它只检查待处理消息列表,并将空闲消息分配给看起来活跃的消费者。活跃消费者可以通过流的其中一个可观察性功能获得。这是下一节的主题。
自动认领
XAUTOCLAIM
命令(在 Redis OSS 6.2 中添加)实现了我们上面描述的认领过程。XPENDING
和 XCLAIM
为不同类型的恢复机制提供了基本的构建块。此命令通过让 Valkey 管理认领过程来优化通用过程,并为大多数恢复需求提供了简单的解决方案。
XAUTOCLAIM
识别空闲的待处理消息并将其所有权转移给消费者。该命令的签名如下
XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]
因此,在上面的示例中,我可以使用自动认领来认领一条消息,如下所示
127.0.0.1:6379> XAUTOCLAIM race:italy italy_riders Alice 60000 0-0 COUNT 1
1) "0-0"
2) 1) 1) "1692632662819-0"
2) 1) "rider"
2) "Sam-Bodden"
像 XCLAIM
一样,该命令返回一个已认领消息的数组,但它也返回一个流 ID,允许迭代待处理条目。流 ID 是一个游标,我可以在下次调用中使用它来继续认领空闲的待处理消息。
127.0.0.1:6379> XAUTOCLAIM race:italy italy_riders Lora 60000 (1692632662819-0 COUNT 1
1) "1692632662819-0"
2) 1) 1) "1692632647899-0"
2) 1) "rider"
2) "Royce"
当 XAUTOCLAIM
返回“0-0”流 ID 作为游标时,这意味着它已到达消费者组待处理条目列表的末尾。这并不意味着没有新的空闲待处理消息,因此该过程通过从流的开头调用 XAUTOCLAIM
来继续。
认领和交付计数器
您在 XPENDING
输出中观察到的计数器是每条消息的交付次数。计数器以两种方式递增:当消息通过 XCLAIM
成功认领时,或者当使用 XREADGROUP
调用访问待处理消息的历史记录时。
发生故障时,消息多次传递是正常的,但最终它们通常会被处理并确认。但是,处理某些特定消息可能会出现问题,因为它已损坏或以某种方式触发了处理代码中的错误。在这种情况下,消费者将持续无法处理此特定消息。由于我们有交付尝试计数器,我们可以使用该计数器来检测由于某种原因无法处理的消息。因此,一旦交付计数器达到您选择的某个较大数字,更明智的做法可能是将此类消息放入另一个流中并向系统管理员发送通知。这基本上是流实现死信概念的方式。
流可观测性
缺乏可观测性的消息系统很难使用。不知道谁在消费消息,哪些消息待处理,给定流中活跃的消费者组集合,这使得一切都变得不透明。因此,流和消费者组有不同的方法来观察正在发生的事情。我们已经介绍了 XPENDING
,它允许我们检查给定时刻正在处理的消息列表,以及它们的空闲时间和交付次数。
然而,我们可能希望做得更多,XINFO
命令是一个可观测性接口,可以通过子命令获取有关流或消费者组的信息。
此命令使用子命令来显示有关流及其消费者组状态的不同信息。例如,XINFO STREAM
127.0.0.1:6379> XINFO STREAM race:italy
1) "length"
2) (integer) 5
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1692632678249-0"
9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1692632639151-0"
2) 1) "rider"
2) "Castilla"
13) "last-entry"
14) 1) "1692632678249-0"
2) 1) "rider"
2) "Norem"
输出显示了流在内部是如何编码的信息,还显示了流中的第一条和最后一条消息。另一个可用的信息是与此流关联的消费者组的数量。我们可以进一步深入,请求更多关于消费者组的信息。
127.0.0.1:6379> XINFO GROUPS race:italy
1) 1) "name"
2) "italy_riders"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "1692632662819-0"
正如您在此输出和之前的输出中看到的,XINFO
命令输出一系列字段-值对。因为它是一个可观测性命令,这使得人类用户能够立即理解报告的信息,并且允许命令将来通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他必须更注重带宽效率的命令,如 XPENDING
,只报告信息而不带字段名。
上面示例的输出,其中使用了 GROUPS 子命令,通过观察字段名称应该很清楚。我们可以通过检查组中注册的消费者来更详细地检查特定消费者组的状态。
127.0.0.1:6379> XINFO CONSUMERS race:italy italy_riders
1) 1) "name"
2) "Alice"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 177546
2) 1) "name"
2) "Bob"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 424686
3) 1) "name"
2) "Lora"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 72241
如果您不记得命令的语法,只需向命令本身请求帮助
> XINFO HELP
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3) Show consumers of <groupname>.
4) GROUPS <key>
5) Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7) Show information about the stream.
8) HELP
9) Prints this help.
与 Kafka (TM) 分区的区别
流中的消费者组在某种程度上可能类似于 Kafka (TM) 基于分区的消费者组,但请注意,流在实际中非常不同。分区只是逻辑的,消息只放在一个 Valkey 键中,因此不同客户端的服务方式是基于谁准备好处理新消息,而不是客户端从哪个分区读取。例如,如果消费者 C3 在某个时候永久失败,Valkey 将继续向 C1 和 C2 提供所有新到达的消息,就好像现在只有两个逻辑分区一样。
同样,如果某个消费者处理消息的速度比其他消费者快得多,那么该消费者将在相同的时间单位内接收到按比例更多的消息。这之所以可能,是因为 Valkey 会明确跟踪所有未确认的消息,并记住谁收到了哪条消息以及从未传递给任何消费者的第一条消息的 ID。
然而,这也意味着在 Valkey 中,如果你真的想将同一流中的消息分区到多个 Valkey 实例,你必须使用多个键和一些分片系统,例如 Valkey 集群或其他应用程序特定的分片系统。单个流不会自动分区到多个实例。
我们可以说,示意性地,以下是正确的
- 如果您使用 1 个流 -> 1 个消费者,您正在按顺序处理消息。
- 如果您使用 N 个流和 N 个消费者,以便只有一个给定消费者命中 N 个流的一个子集,您可以扩展上述 1 个流 -> 1 个消费者的模型。
- 如果您使用 1 个流 -> N 个消费者,您正在向 N 个消费者进行负载均衡,但在这种情况下,关于相同逻辑项目的消息可能会乱序消费,因为给定消费者处理消息 3 的速度可能比另一个消费者处理消息 4 的速度快。
因此,基本上 Kafka 分区更类似于使用 N 个不同的 Valkey 键,而 Valkey 消费者组是从给定流到 N 个不同消费者的服务器端负载均衡系统。
有上限的流
许多应用程序不希望永远将数据收集到流中。有时,流中最大项目数给定值很有用,而有时一旦达到给定大小,将数据从 Valkey 移动到非内存且速度不快但适合存储历史记录(可能长达数十年)的存储系统就很有用。流对此提供了一些支持。其中一个是 XADD
命令的 MAXLEN 选项。此选项使用起来非常简单
127.0.0.1:6379> XADD race:italy MAXLEN 2 * rider Jones
"1692633189161-0"
127.0.0.1:6379> XADD race:italy MAXLEN 2 * rider Wood
"1692633198206-0"
127.0.0.1:6379> XADD race:italy MAXLEN 2 * rider Henshaw
"1692633208557-0"
127.0.0.1:6379> XLEN race:italy
(integer) 2
127.0.0.1:6379> XRANGE race:italy - +
1) 1) "1692633198206-0"
2) 1) "rider"
2) "Wood"
2) 1) "1692633208557-0"
2) 1) "rider"
2) "Henshaw"
使用 MAXLEN,当达到指定长度时,旧条目会自动被逐出,从而使流保持恒定大小。目前没有选项可以告诉流只保留不早于给定时间段的项目,因为这样的命令为了持续运行,可能会长时间阻塞以逐出项目。例如,想象一下如果出现插入峰值,然后长时间暂停,然后再次插入,所有这些都具有相同的最大时间,会发生什么。流将阻塞以逐出在暂停期间变得太旧的数据。因此,用户需要进行一些规划并了解所需的流最大长度。此外,虽然流的长度与使用的内存成正比,但按时间修剪更难控制和预测:它取决于插入速率,而插入速率通常随时间变化(当它不变时,按大小修剪是微不足道的)。
然而,使用 MAXLEN 进行修剪可能会很昂贵:流由基数树中的宏节点表示,以实现非常高的内存效率。更改单个宏节点(由几十个元素组成)并不是最优的。因此,可以使用以下特殊形式的命令
XADD race:italy MAXLEN ~ 1000 * ... entry fields here ...
MAXLEN 选项和实际计数之间的 ~
参数表示,我不需要精确的 1000 个项目。它可以是 1000、1010 或 1030,只要确保至少保留 1000 个项目即可。有了这个参数,修剪操作仅在我们可以移除整个节点时执行。这使得效率大大提高,而且通常这就是您想要的。您会注意到,客户端库有各种实现。例如,Python 客户端默认使用近似值,并且必须明确设置为精确长度。
还有一个 XTRIM
命令,它执行与上面 MAXLEN 选项非常相似的操作,只不过它可以单独运行
127.0.0.1:6379> XTRIM race:italy MAXLEN 10
(integer) 0
或者,如同 XADD
选项一样
127.0.0.1:6379> XTRIM race:italy MAXLEN ~ 10
(integer) 0
然而,XTRIM
旨在接受不同的修剪策略。另一种修剪策略是 MINID,它会逐出 ID 低于指定 ID 的条目。
由于 XTRIM
是一个显式命令,因此用户需要了解不同修剪策略可能存在的缺点。
未来可能添加到 XTRIM
的另一个有用的驱逐策略是按 ID 范围删除,以简化 XRANGE
和 XTRIM
的使用,以便在需要时将数据从 Valkey 移动到其他存储系统。
流 API 中的特殊 ID
您可能已经注意到,Valkey API 中有几个可以使用的特殊 ID。这里有一个简短的回顾,以便它们将来更有意义。
前两个特殊 ID 是 -
和 +
,用于 XRANGE
命令的范围查询。这两个 ID 分别表示可能的最小 ID(即基本上是 0-1
)和可能的最大 ID(即 18446744073709551615-18446744073709551615
)。正如您所看到的,写 -
和 +
比写那些数字要简洁得多。
然后,有些 API 中我们希望表示流中具有最大 ID 的项目的 ID。这就是 $
的含义。因此,例如,如果我只想用 XREADGROUP
获取新条目,我使用此 ID 来表示我已拥有所有现有条目,但没有将来插入的新条目。同样,当我创建或设置消费者组的 ID 时,我可以将最后交付的项目设置为 $
,以便只向组中的消费者交付新条目。
正如您所看到的,$
并不意味着 +
,它们是两个不同的东西,因为 +
是所有可能流中最大的 ID,而 $
是包含给定条目的给定流中最大的 ID。此外,API 通常只理解 +
或 $
,但避免一个符号具有多重含义仍然很有用。
另一个特殊 ID 是 >
,它仅与消费者组相关,并且仅在使用 XREADGROUP
命令时有效。此特殊 ID 意味着我们只希望从未传递给其他消费者的条目。因此,基本上,>
ID 是消费者组的最后交付 ID。
最后,特殊 ID *
只能与 XADD
命令一起使用,表示为新条目自动选择一个 ID。
所以我们有 -
、+
、$
、>
和 *
,它们都有不同的含义,而且大多数情况下,可以在不同的上下文中使用。
持久性、复制和消息安全性
流,像任何其他 Valkey 数据结构一样,会异步复制到副本并持久化到 AOF 和 RDB 文件中。然而,可能不太明显的是,消费者组的完整状态也会传播到 AOF、RDB 和副本中,所以如果主节点中有一条消息处于待处理状态,副本也会有相同的信息。同样,在重启后,AOF 会恢复消费者组的状态。
然而,请注意,流和消费者组是使用 Valkey 默认复制进行持久化和复制的,所以
- 如果消息的持久性在您的应用程序中很重要,则必须使用强 fsync 策略来使用 AOF。
- 默认情况下,异步复制不保证
XADD
命令或消费者组状态更改会被复制:故障转移后,根据副本从主节点接收数据的能力,可能会丢失一些数据。 - 可以使用
WAIT
命令强制将更改传播到一组副本。但是请注意,虽然这使得数据丢失的可能性大大降低,但由 Sentinel 或 Valkey Cluster 操作的 Valkey 故障转移过程只会尽力检查故障转移到最新更新的副本,并且在某些特定的故障条件下可能会提升一个缺少数据的副本。
因此,在设计使用流和消费者组的应用程序时,请务必了解您的应用程序在故障期间应具备的语义属性,并相应地配置各项,评估其对您的用例是否足够安全。
从流中删除单个项目
流还有一个特殊的命令,用于仅通过 ID 从流中间删除项目。通常对于仅追加数据结构,这可能看起来是一个奇怪的功能,但实际上它对于涉及隐私法规等应用程序非常有用。该命令称为 XDEL
,它接收流的名称,后跟要删除的 ID。
127.0.0.1:6379> XRANGE race:italy - + COUNT 2
1) 1) "1692633198206-0"
2) 1) "rider"
2) "Wood"
2) 1) "1692633208557-0"
2) 1) "rider"
2) "Henshaw"
127.0.0.1:6379> XDEL race:italy 1692633208557-0
(integer) 1
127.0.0.1:6379> XRANGE race:italy - + COUNT 2
1) 1) "1692633198206-0"
2) 1) "rider"
2) "Wood"
然而,在当前实现中,内存实际上不会被回收,直到一个宏节点完全为空,所以不应该滥用此功能。
零长度流
流与其他 Valkey 数据结构之间的一个区别是,当其他数据结构不再有任何元素时,作为调用删除元素命令的副作用,键本身将被删除。例如,当调用 ZREM
删除有序集合中的最后一个元素时,该有序集合将被完全删除。另一方面,流允许保持零元素,这既可以是使用计数为零的 MAXLEN 选项(XADD
和 XTRIM
命令)的结果,也可以是因为调用了 XDEL
。
这种不对称存在的原因是流可能与消费者组相关联,我们不希望仅仅因为流中不再有任何项目而丢失消费者组定义的状态。目前,即使流没有关联的消费者组,也不会被删除。
消费消息的总延迟
非阻塞流命令,如 XRANGE
和 XREAD
或不带 BLOCK 选项的 XREADGROUP
,像任何其他 Valkey 命令一样同步提供服务,因此讨论这些命令的延迟是没有意义的:更有趣的是检查 Valkey 文档中命令的时间复杂度。只需说明流命令在提取范围时至少与有序集合命令一样快,并且 XADD
非常快,如果使用流水线,在普通机器上可以轻松每秒插入五十万到一百万个项目。
然而,如果我们想了解处理消息的延迟,在消费者组中阻塞的消费者的情况下,从通过 XADD
生成消息的那一刻起,到消费者因为 XREADGROUP
返回消息而获得消息的那一刻为止,延迟就成了一个有趣的参数。
如何服务阻塞的消费者
在提供测试结果之前,了解 Valkey 用于路由流消息的模型(以及通常如何管理任何等待数据的阻塞操作)是很有趣的。
- 被阻塞的客户端在一个哈希表中被引用,该哈希表将至少有一个阻塞消费者的键映射到等待该键的消费者列表。通过这种方式,给定一个已接收数据的键,我们可以解析所有等待该数据的客户端。
- 当发生写入时,在本例中是调用
XADD
命令时,它会调用signalKeyAsReady()
函数。此函数会将键放入一个需要处理的键列表中,因为这些键可能包含针对阻塞消费者的新数据。请注意,这些就绪键将在稍后处理,因此在同一事件循环周期中,键可能会接收其他写入。 - 最后,在返回事件循环之前,就绪键最终会被处理。对于每个键,会扫描等待数据的客户端列表,如果适用,这些客户端将收到新到达的数据。在流的情况下,数据是消费者请求的适用范围内的消息。
正如你所看到的,基本上,在返回事件循环之前,调用 XADD
的客户端和阻塞以消费消息的客户端都会在输出缓冲区中收到回复,因此 XADD
的调用者应该在消费者收到新消息的同时收到 Valkey 的回复。
这种模型是推模式,因为向消费者缓冲区添加数据将直接通过调用 XADD
的操作来执行,所以延迟往往非常可预测。
延迟测试结果
为了检查这些延迟特性,进行了一项测试,该测试使用多个 Ruby 程序实例,这些程序推送包含计算机毫秒时间作为附加字段的消息,以及 Ruby 程序从消费者组读取并处理这些消息。消息处理步骤包括将当前计算机时间与消息时间戳进行比较,以了解总延迟。
获得的结果
Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%
因此,99.9% 的请求延迟 <= 2 毫秒,且异常值仍非常接近平均值。
向流中添加数百万条未确认消息并不会改变基准测试的要旨,大多数查询仍然以很短的延迟进行处理。
几点备注
- 这里我们每次迭代处理了多达 1 万条消息,这意味着
XREADGROUP
的COUNT
参数设置为 10000。这会增加大量延迟,但为了让慢速消费者能够跟上消息流,这是必需的。因此,您可以预期实际延迟会小得多。 - 用于此基准测试的系统与当今标准相比非常慢。