XCLAIM

用法
XCLAIM key group consumer min-idle-time ID [ ID ... ] [ IDLE ms ] [ TIME unix-time-milliseconds ] [ RETRYCOUNT count ] [ FORCE ] [ JUSTID ] [ LASTID lastid ]
复杂度
O(log N),其中 N 是消费者组 PEL 中的消息数量。
始于
5.0.0
ACL 类别
@stream, @write, @fast

在流消费者组的上下文中,此命令更改待处理消息的所有权,以便新所有者是命令参数中指定的消费者。通常情况下,会发生以下情况:

  1. 存在一个关联了消费者组的流。
  2. 消费者 A 在该消费者组的上下文中,通过 XREADGROUP 从流中读取一条消息。
  3. 作为副作用,消费者组的待处理条目列表(PEL)中会创建一个待处理消息条目:这意味着消息已传递给特定消费者,但尚未通过 XACK 进行确认。
  4. 然后该消费者突然永久性失败。
  5. 其他消费者可以使用 XPENDING 命令检查已闲置一段时间的待处理消息列表。为了继续处理这些消息,它们使用 XCLAIM 来获取消息的所有权并继续处理。消费者还可以使用 XAUTOCLAIM 命令自动扫描并认领闲置的待处理消息。

这种动态在流简介文档中有清晰解释。

请注意,只有当消息的闲置时间大于我们在调用 XCLAIM 时指定的最小闲置时间时,消息才会被认领。因为作为副作用,XCLAIM 还会重置闲置时间(因为这是处理消息的新尝试),所以两个消费者同时尝试认领一条消息将永远不会都成功:只有一个会成功认领消息。这避免了我们以一种简单的方式多次处理给定消息(然而在一般情况下,多次处理是可能且不可避免的)。

此外,作为副作用,XCLAIM 将增加消息的尝试传递次数,除非指定了 JUSTID 选项(该选项只返回消息 ID,而不返回消息本身)。通过这种方式,由于某些原因(例如消费者在尝试处理消息时崩溃)而无法处理的消息将开始拥有更大的计数器,并可以在系统内部被检测到。

在以下情况下,XCLAIM 不会认领消息:

  1. 消息不存在于组 PEL 中(即从未被任何消费者读取过)。
  2. 消息存在于组 PEL 中,但不存在于流本身中(即消息已被读取但从未确认,然后被从流中删除,无论是通过修剪还是通过 XDEL)。

在上述两种情况下,回复将不包含与该消息对应的条目(即回复数组的长度可能小于提供给 XCLAIM 的 ID 数量)。在后一种情况下,消息也将从其所在的 PEL 中删除。

命令选项

该命令有多个选项,但大多数主要用于内部,以将 XCLAIM 或其他命令的效果传输到 AOF 文件并将相同的效果传播到副本,因此对于普通用户可能不会有用。

  1. IDLE <ms>: 设置消息的闲置时间(上次传递时间)。如果未指定 IDLE,则假定 IDLE 为 0,即时间计数被重置,因为消息现在有了一个新的所有者尝试处理它。
  2. TIME <ms-unix-time>: 这与 IDLE 相同,但不是相对的毫秒数,而是将闲置时间设置为特定的 Unix 时间(以毫秒为单位)。这对于重写生成 XCLAIM 命令的 AOF 文件非常有用。
  3. RETRYCOUNT <count>: 将重试计数器设置为指定值。每次消息再次传递时,此计数器都会增加。通常,XCLAIM 不会更改此计数器,该计数器仅在调用 XPENDING 命令时提供给客户端:通过这种方式,客户端可以检测异常情况,例如在大量传递尝试后由于某种原因从未处理过的消息。
  4. FORCE: 即使某些指定的 ID 尚未分配给其他客户端,也会在 PEL 中创建待处理消息条目。但是,消息必须存在于流中,否则将忽略不存在的消息的 ID。
  5. JUSTID: 只返回成功认领的消息 ID 数组,不返回实际消息。使用此选项意味着重试计数器不会增加。

示例

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

在上面的示例中,我们认领 ID 为 1526569498055-0 的消息,但仅当消息已闲置至少一小时,且原始消费者或其他消费者未取得进展(确认或认领它),并将其所有权分配给消费者 Alice

RESP2/RESP3 回复

以下任一情况:

  • 数组回复:当指定 JUSTID 选项时,返回一个成功认领的消息 ID 数组。

  • 数组回复:一个流条目数组,每个条目都包含一个由两个元素组成的数组,即条目 ID 和条目数据本身。