- 用法
-
XCLAIM key group consumer min-idle-time ID [ ID ... ] [ IDLE ms ] [ TIME unix-time-milliseconds ] [ RETRYCOUNT count ] [ FORCE ] [ JUSTID ] [ LASTID lastid ]
- 复杂度
- O(log N),其中 N 是消费者组 PEL 中的消息数量。
- 始于
- 5.0.0
- ACL 类别
- @stream, @write, @fast
- 存在一个关联了消费者组的流。
- 消费者 A 在该消费者组的上下文中,通过
XREADGROUP
从流中读取一条消息。 - 作为副作用,消费者组的待处理条目列表(PEL)中会创建一个待处理消息条目:这意味着消息已传递给特定消费者,但尚未通过
XACK
进行确认。 - 然后该消费者突然永久性失败。
- 其他消费者可以使用
XPENDING
命令检查已闲置一段时间的待处理消息列表。为了继续处理这些消息,它们使用XCLAIM
来获取消息的所有权并继续处理。消费者还可以使用XAUTOCLAIM
命令自动扫描并认领闲置的待处理消息。 - 消息不存在于组 PEL 中(即从未被任何消费者读取过)。
- 消息存在于组 PEL 中,但不存在于流本身中(即消息已被读取但从未确认,然后被从流中删除,无论是通过修剪还是通过
XDEL
)。 IDLE <ms>
: 设置消息的闲置时间(上次传递时间)。如果未指定 IDLE,则假定 IDLE 为 0,即时间计数被重置,因为消息现在有了一个新的所有者尝试处理它。TIME <ms-unix-time>
: 这与 IDLE 相同,但不是相对的毫秒数,而是将闲置时间设置为特定的 Unix 时间(以毫秒为单位)。这对于重写生成XCLAIM
命令的 AOF 文件非常有用。RETRYCOUNT <count>
: 将重试计数器设置为指定值。每次消息再次传递时,此计数器都会增加。通常,XCLAIM
不会更改此计数器,该计数器仅在调用 XPENDING 命令时提供给客户端:通过这种方式,客户端可以检测异常情况,例如在大量传递尝试后由于某种原因从未处理过的消息。FORCE
: 即使某些指定的 ID 尚未分配给其他客户端,也会在 PEL 中创建待处理消息条目。但是,消息必须存在于流中,否则将忽略不存在的消息的 ID。JUSTID
: 只返回成功认领的消息 ID 数组,不返回实际消息。使用此选项意味着重试计数器不会增加。
在流消费者组的上下文中,此命令更改待处理消息的所有权,以便新所有者是命令参数中指定的消费者。通常情况下,会发生以下情况:
这种动态在流简介文档中有清晰解释。
请注意,只有当消息的闲置时间大于我们在调用 XCLAIM
时指定的最小闲置时间时,消息才会被认领。因为作为副作用,XCLAIM
还会重置闲置时间(因为这是处理消息的新尝试),所以两个消费者同时尝试认领一条消息将永远不会都成功:只有一个会成功认领消息。这避免了我们以一种简单的方式多次处理给定消息(然而在一般情况下,多次处理是可能且不可避免的)。
此外,作为副作用,XCLAIM
将增加消息的尝试传递次数,除非指定了 JUSTID
选项(该选项只返回消息 ID,而不返回消息本身)。通过这种方式,由于某些原因(例如消费者在尝试处理消息时崩溃)而无法处理的消息将开始拥有更大的计数器,并可以在系统内部被检测到。
在以下情况下,XCLAIM
不会认领消息:
在上述两种情况下,回复将不包含与该消息对应的条目(即回复数组的长度可能小于提供给 XCLAIM
的 ID 数量)。在后一种情况下,消息也将从其所在的 PEL 中删除。
命令选项
该命令有多个选项,但大多数主要用于内部,以将 XCLAIM
或其他命令的效果传输到 AOF 文件并将相同的效果传播到副本,因此对于普通用户可能不会有用。
示例
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
在上面的示例中,我们认领 ID 为 1526569498055-0
的消息,但仅当消息已闲置至少一小时,且原始消费者或其他消费者未取得进展(确认或认领它),并将其所有权分配给消费者 Alice
。
RESP2/RESP3 回复
以下任一情况: