文档:模块和阻塞命令

Valkey 在其内置命令集中包含一些阻塞命令。其中最常用的是 BLPOP(或对称的 BRPOP),它会阻塞直到列表中有元素到达。

关于阻塞命令的有趣之处在于,它们不会阻塞整个服务器,而只会阻塞调用它们的客户端。通常,阻塞的原因是我们期望某些外部事件发生:这可以是 Valkey 数据结构中的某些更改,例如 BLPOP 的情况,线程中正在进行的长时间计算,从网络接收数据等等。

Valkey 模块也能够实现阻塞命令,本文档展示了 API 的工作原理,并描述了一些可用于建模阻塞命令的模式。

阻塞和恢复的工作原理。

注意: 您可能需要查看 Valkey 源代码树中 src/modules 目录下的 helloblock.c 示例,以了解如何应用阻塞 API 的简单易懂的示例。

在 Valkey 模块中,命令由回调函数实现,当用户调用特定命令时,Valkey 核心会调用这些回调函数。通常,回调函数会通过向客户端发送回复来终止其执行。但如果使用以下函数,实现模块命令的函数可以请求将客户端置于阻塞状态

ValkeyModuleBlockedClient *ValkeyModule_BlockClient(ValkeyModuleCtx *ctx,
                                                    ValkeyModuleCmdFunc reply_callback,
                                                    ValkeyModuleCmdFunc timeout_callback,
                                                    void (*free_privdata)(void*),
                                                    long long timeout_ms);

该函数返回一个 ValkeyModuleBlockedClient 对象,该对象稍后用于解除客户端的阻塞。参数的含义如下

  • ctx 是命令执行上下文,与 API 的其余部分通常一样。
  • reply_callback 是一个回调函数,它具有与普通命令函数相同的原型,并在客户端解除阻塞时被调用,以便向客户端返回回复。
  • timeout_callback 是一个回调函数,它具有与普通命令函数相同的原型,并在客户端达到 ms 超时时被调用。
  • free_privdata 是用于释放私有数据的回调函数。私有数据是一个指针,指向在解除客户端阻塞的 API 和向客户端发送回复的回调函数之间传递的一些数据。我们将在本文档的后面部分看到此机制的工作原理。
  • ms 是以毫秒为单位的超时时间。当达到超时时,会调用超时回调函数,并且客户端会自动中止。

一旦客户端被阻塞,可以使用以下 API 解除阻塞

int ValkeyModule_UnblockClient(ValkeyModuleBlockedClient *bc, void *privdata);

该函数将之前调用 ValkeyModule_BlockClient() 返回的阻塞客户端对象作为参数,并解除客户端的阻塞。在客户端解除阻塞之前,会立即调用在客户端被阻塞时指定的 reply_callback 函数:此函数将能够访问此处使用的 privdata 指针。

重要提示:上述函数是线程安全的,可以在执行某些工作以实现阻塞客户端的命令的线程内部调用。

当客户端解除阻塞时,privdata 数据将使用 free_privdata 回调函数自动释放。这很有用,因为如果客户端超时或与服务器断开连接,回复回调函数可能永远不会被调用,因此如果需要,由外部函数负责释放传递的数据非常重要。

为了更好地理解 API 的工作原理,我们可以想象编写一个命令,它会阻塞客户端一秒钟,然后发送“Hello!”作为回复。

注意:为了使示例简单,此命令中未实现参数检查和其他不重要的事情。

int Example_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
                         int argc)
{
    ValkeyModuleBlockedClient *bc =
        ValkeyModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    pthread_create(&tid,NULL,threadmain,bc);

    return VALKEYMODULE_OK;
}

void *threadmain(void *arg) {
    ValkeyModuleBlockedClient *bc = arg;

    sleep(1); /* Wait one second and unblock. */
    ValkeyModule_UnblockClient(bc,NULL);
}

上述命令会立即阻塞客户端,并生成一个线程,该线程将等待一秒钟并解除客户端的阻塞。让我们检查回复和超时回调函数,在我们的例子中它们非常相似,因为它们只是以不同的回复类型回复客户端。

int reply_func(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
               int argc)
{
    return ValkeyModule_ReplyWithSimpleString(ctx,"Hello!");
}

int timeout_func(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
               int argc)
{
    return ValkeyModule_ReplyWithNull(ctx);
}

回复回调函数只是将“Hello!”字符串发送给客户端。这里的重点是,当客户端从线程中解除阻塞时,会调用回复回调函数。

超时命令返回 NULL,这在实际的 Valkey 阻塞命令超时时经常发生。

解除阻塞时传递回复数据

上述示例易于理解,但缺少实际阻塞命令实现的一个重要实际方面:回复函数通常需要知道要回复客户端什么,并且此信息通常在客户端解除阻塞时提供。

我们可以修改上述示例,使线程等待一秒钟后生成一个随机数。您可以将其视为某种实际的耗时操作。然后可以将此随机数传递给回复函数,以便我们将其返回给命令调用者。为了使其工作,我们修改函数如下

void *threadmain(void *arg) {
    ValkeyModuleBlockedClient *bc = arg;

    sleep(1); /* Wait one second and unblock. */

    long *mynumber = ValkeyModule_Alloc(sizeof(long));
    *mynumber = rand();
    ValkeyModule_UnblockClient(bc,mynumber);
}

如您所见,现在解除阻塞调用正在向回复回调函数传递一些私有数据,即 mynumber 指针。为了获取此私有数据,回复回调函数将使用以下函数

void *ValkeyModule_GetBlockedClientPrivateData(ValkeyModuleCtx *ctx);

因此,我们的回复回调函数修改如下

int reply_func(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
               int argc)
{
    long *mynumber = ValkeyModule_GetBlockedClientPrivateData(ctx);
    /* IMPORTANT: don't free mynumber here, but in the
     * free privdata callback. */
    return ValkeyModule_ReplyWithLongLong(ctx,mynumber);
}

请注意,当使用 ValkeyModule_BlockClient() 阻塞客户端时,我们还需要传递一个 free_privdata 函数,因为分配的长整型值必须被释放。我们的回调函数将如下所示

void free_privdata(void *privdata) {
    ValkeyModule_Free(privdata);
}

注意:需要强调的是,最好在 free_privdata 回调函数中释放私有数据,因为如果客户端断开连接或超时,回复函数可能不会被调用。

另请注意,私有数据也可以从超时回调函数中访问,始终使用 GetBlockedClientPrivateData() API。

中止客户端的阻塞

有时出现的一个问题是,我们需要分配资源才能实现非阻塞命令。因此,我们阻塞客户端,然后例如尝试创建一个线程,但线程创建函数返回错误。在这种情况下如何恢复?我们不想让客户端保持阻塞状态,也不想调用 UnblockClient(),因为这会触发回复回调函数的调用。

在这种情况下,最好的做法是使用以下函数

int ValkeyModule_AbortBlock(ValkeyModuleBlockedClient *bc);

实际上,使用方法如下

int Example_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
                         int argc)
{
    ValkeyModuleBlockedClient *bc =
        ValkeyModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
        ValkeyModule_AbortBlock(bc);
        ValkeyModule_ReplyWithError(ctx,"Sorry can't create a thread");
    }

    return VALKEYMODULE_OK;
}

客户端将被解除阻塞,但回复回调函数不会被调用。

使用单个函数实现命令、回复和超时回调

以下函数可用于使用实现主要命令函数的相同函数来实施回复和回调。

int ValkeyModule_IsBlockedReplyRequest(ValkeyModuleCtx *ctx);
int ValkeyModule_IsBlockedTimeoutRequest(ValkeyModuleCtx *ctx);

因此,我可以重写示例命令,而无需使用单独的回复和超时回调函数

int Example_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
                         int argc)
{
    if (ValkeyModule_IsBlockedReplyRequest(ctx)) {
        long *mynumber = ValkeyModule_GetBlockedClientPrivateData(ctx);
        return ValkeyModule_ReplyWithLongLong(ctx,mynumber);
    } else if (ValkeyModule_IsBlockedTimeoutRequest) {
        return ValkeyModule_ReplyWithNull(ctx);
    }

    ValkeyModuleBlockedClient *bc =
        ValkeyModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
        ValkeyModule_AbortBlock(bc);
        ValkeyModule_ReplyWithError(ctx,"Sorry can't create a thread");
    }

    return VALKEYMODULE_OK;
}

功能上是相同的,但有些人会更喜欢不那么冗长的实现,将大部分命令逻辑集中在一个函数中。

在线程内部处理数据副本

一个有趣的工作模式是,在使用实现命令慢速部分的线程时,处理数据的副本,这样当某个操作在键中执行时,用户仍然可以看到旧版本。但是,当线程完成其工作时,表示会进行交换,并使用新的、经过处理的版本。

这种方法的一个例子是 Neural Redis 模块,其中神经网络在不同的线程中进行训练,而用户仍然可以执行和检查它们的旧版本。

线程安全上下文

有关如何从线程安全调用 Valkey 模块 API 的信息,请参阅模块 API 参考中的线程安全上下文