Hooks(钩子)

介绍

Hocuspocus 提供了钩子来扩展其功能并将其集成到现有应用程序中。钩子配置为简单的方法,方式与其他配置选项相同。

钩子接受一个钩子负载作为第一个参数。负载是一个包含可以使用和操作的数据的对象,允许你基于这个简单机制构建复杂的东西,比如扩展

钩子需要返回一个 Promise;最简单的方法是将函数标记为 async(自 v4 起需要 Node.js 22+)。通过这种方式,你可以执行诸如 API 请求、数据库查询、触发 webhook 等操作,或做任何你需要做的事情,以便将其集成到你的应用程序中。

自 v4 起,钩子负载使用 Web 标准的 RequestHeaders 对象,而不是 Node.js 的 IncomingMessageIncomingHttpHeaders。读取请求头时应使用 requestHeaders.get('authorization'),而不是 requestHeaders['authorization']onUpgradeonRequest 钩子是例外——它们仍然使用 Node.js 类型,因为它们是在 WebSocket 升级之前的 HTTP 层面上运行的。

生命周期

钩子将在 Hocuspocus 生命周期的不同阶段被调用。例如,调用服务器实例的 listen() 方法时,会调用 onListen 钩子。

部分钩子不仅允许你响应这些事件,还能拦截它们。例如,当新连接建立到底层 websocket 服务器时,会触发 onConnect 钩子。通过在钩子中拒绝 Promise(或者在使用 async 时抛出异常),你可以终止连接并停止调用链。

钩子链

扩展使用钩子为 Hocuspocus 添加额外的功能。它们将按照注册顺序依次调用,你的配置位于链的最后部分。

如果某个钩子中的 Promise 被拒绝,则后续扩展或你的配置中的钩子将不会被调用。这类似于请求必须通过的一组中间件。使用钩子时请牢记这一点。

举例来说,如果用户不允许连接:只需在 onAuthenticate() 钩子中抛出一个错误。很简单,不是吗?

总结表

钩子描述链接
beforeHandleMessage在处理消息之前阅读更多
onConnect建立连接时阅读更多
connected连接成功建立后阅读更多
onAuthenticate需要身份验证时阅读更多
onAwarenessUpdate状态更新时阅读更多
onLoadDocument创建新文档期间阅读更多
afterLoadDocument文档创建完成后阅读更多
onChange文档发生变化时阅读更多
onDisconnect连接关闭时阅读更多
onListen服务器初始化时阅读更多
onDestroy服务器即将销毁时阅读更多
onConfigure服务器配置完成时阅读更多
onRequestHTTP 请求到达时阅读更多
onStoreDocument文档发生改变时阅读更多
onUpgradeWebSocket 连接升级时阅读更多
onStateless收到无状态消息时阅读更多
beforeBroadcastStateless广播无状态消息之前阅读更多
afterUnloadDocument文档在服务器关闭时阅读更多

使用示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  async onAuthenticate({ documentName, token }) {
    // 可以是 API 调用、数据库查询等...
    // 如果用户通过身份验证,接口应返回 200 OK,否则返回 HTTP 错误。
    return axios.get("/user", {
      headers: {
        Authorization: `Bearer ${token}`,
      },
    });
  },
});

server.listen();

钩子详解

beforeHandleMessage

当服务器接收到消息时,在处理/应用该消息之前会调用 beforeHandleMessage 钩子。该钩子可用于拒绝消息(例如当身份验证令牌过期时),甚至可根据自定义规则检查更新消息并拒绝或接受它。如果在钩子中抛出错误,连接将被关闭。你可以通过抛出实现了 CloseEvent 的错误来自定义关闭代码和原因(详见下例)。

钩子负载

传递给 beforeHandleMessage 钩子的 data 对象包含以下属性:

import { URLSearchParams } from "url";
import { Doc } from "yjs";
import { CloseEvent } from "@hocuspocus/common";

const data = {
  clientsCount: number,
  context: any,
  document: Doc,
  documentName: string,
  instance: Hocuspocus,
  requestHeaders: Headers,
  requestParameters: URLSearchParams,
  update: Uint8Array,
  socketId: string,
};

context 包含之前 onConnect 钩子提供的数据。

示例

import { debounce } from "debounce";
import { Server } from "@hocuspocus/server";

let debounced;

const server = new Server({
  beforeHandleMessage(data) {
    if (data.context.tokenExpiresAt <= new Date()) {
      const error: CloseEvent = {
        reason: "令牌已过期",
      };

      throw error;
    }
  },
});

server.listen();

connected

connected 钩子在新连接成功建立后调用。

示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  async connected() {
    console.log("当前连接数:", server.getConnectionsCount());
  },
});

server.listen();

onAuthenticate

当服务器接收到客户端提供者发送的身份验证请求时,会调用 onAuthenticate 钩子。它必须返回一个 Promise。抛出异常或拒绝 Promise 会终止连接。

注意,若未提供 token 给 HocuspocusProvider,客户端不会发送 Auth 消息,因而不会调用 onAuthenticate

钩子负载

传递给 onAuthenticate 钩子的 data 对象包含以下属性:

import { URLSearchParams } from "url";
import { Doc } from "yjs";

const data = {
  documentName: string,
  instance: Hocuspocus,
  requestHeaders: Headers,
  requestParameters: URLSearchParams,
  request: Request,
  socketId: string,
  token: string,
  providerVersion: string | undefined,
  connection: {
    readOnly: boolean,
  },
};

示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  async onAuthenticate(data) {
    const { token } = data;

    // 例如:用请求参数测试用户是否认证成功
    if (token !== "super-secret-token") {
      throw new Error("未授权!");
    }

    // 示例:为当前用户设置只读文档,
    // 这样更改将不会被接受或同步给其他客户端
    if (someCondition === true) {
      data.connection.readOnly = true;
    }

    // 你可以设置上下文数据,在其他钩子中使用
    return {
      user: {
        id: 1234,
        name: "John",
      },
    };
  },
});

server.listen();

onTokenSync

当服务器从连接的提供者接收令牌同步请求时会调用 onTokenSync 钩子,允许服务器在活动会话中无需重新连接即可验证用户令牌。

钩子负载

传递给 onTokenSync 钩子的 data 对象包含以下属性:

const data = {
  context: any,
  document: Doc,
  documentName: string,
  instance: Hocuspocus,
  requestHeaders: Headers,
  requestParameters: URLSearchParams,
  socketId: string,
  token: string,
  connectionConfig: {
    readOnly: boolean,
  },
  connection: Connection,
};

示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  async onTokenSync({ token, context, connection }) {
    // 验证当前令牌
    const isValid = await validateToken(token);

    if (!isValid) {
      throw new Error("令牌已过期或无效");
    }

    // 如果权限发生变化,更新权限
    const permissions = await getUserPermissions(context.userId);
    if (permissions.readOnly !== connection.readOnly) {
      connection.readOnly = permissions.readOnly;
    }

    return { lastTokenSync: new Date() };
  },
});

server.listen();

提供者端用法

// 提供者向服务器发送令牌
provider.sendToken();

服务器端用法

// 服务器向提供者请求令牌
const connection = document.connections.values().next().value?.connection;
connection.requestToken();

onAwarenessUpdate

当状态发生变化时调用 onAwarenessUpdate 钩子(参考Provider Awareness API)。

钩子负载

传递给 onAwarenessUpdate 钩子的 data 对象包含以下属性:

import { Awareness } from 'y-protocols/awareness'
import type { Connection, TransactionOrigin } from '@hocuspocus/server'

const data = {
  document: Document,
  documentName: string,
  instance: Hocuspocus,
  update: Uint8Array,
  added: number[],
  updated: number[],
  removed: number[],
  awareness: Awareness,
  states: { clientId: number, [key: string | number]: any }[],
  // Awareness 更新的结构化来源(自 v4 起)
  transactionOrigin: TransactionOrigin,
  // 可选:触发此更新的连接(自 v4 起)
  connection?: Connection,
}

自 v4 起,该负载已简化。特定连接字段(contextrequestHeadersrequestParameterssocketId)已被移除。请通过可选的 connection 访问它们(例如 connection?.context)。

示例

const provider = new HocuspocusProvider({
  url: "ws://127.0.0.1:1234",
  name: "example-document",
  document: ydoc,
  onAwarenessUpdate: ({ states }) => {
    currentStates = states;
  },
});

onChange

当文档本身发生变化时调用 onChange 钩子。它应返回 Promise。

需要理解的是,该钩子每个文档只会调用一次。你可以利用它根据特定连接做出反应,因为负载中提供了 contextupdate(见下文)。

强烈建议对较大开销的操作进行防抖,因为该钩子每秒可能调用多次。

钩子负载

传递给 onChange 钩子的 data 对象包含以下属性:

import { URLSearchParams } from "url";
import { Doc } from "yjs";
import type { TransactionOrigin } from "@hocuspocus/server";

const data = {
  clientsCount: number,
  context: any,
  document: Doc,
  documentName: string,
  instance: Hocuspocus,
  requestHeaders: Headers,
  requestParameters: URLSearchParams,
  update: Uint8Array,
  socketId: string,
  // 结构化来源(自 v4 起)— 使用 isTransactionOrigin() 并基于 .source 进行分支
  transactionOrigin: TransactionOrigin,
};

context 包含之前 onConnect 钩子提供的数据。

示例

:::warning 使用主存储 以下示例并非意在用作主存储,因为 JSON 序列化与反序列化只会存储最终的文档,而不会保存协作历史步骤。此示例仅用于存储你的应用视图所需的文档结果。如需主存储,请参考数据库扩展。 :::

import { debounce } from "debounce";
import { Server } from "@hocuspocus/server";
import { TiptapTransformer } from "@hocuspocus/transformer";
import { writeFile } from "fs";

let debounced;

const server = new Server({
  async onChange(data) {
    const save = () => {
      // 将 y-doc 转换为你可以在视图中使用的格式。
      // 本例中,我们使用 TiptapTransformer 从 ydoc 获取 JSON。
      const prosemirrorJSON = TiptapTransformer.fromYdoc(data.document);

      // 保存文档。在真实应用中,这可以是数据库查询、
      // webhook 或其他操作。
      writeFile(`/path/to/your/documents/${data.documentName}.json`, prosemirrorJSON);

      // 也许你想储存是谁修改了文档?
      // 你可以访问之前 onConnect 钩子中的上下文信息。
      console.log(`文档 ${data.documentName} 被 ${data.context.user.name} 修改`);
    };

    debounced?.clear();
    debounced = debounce(save, 4000);
    debounced();
  },
});

server.listen();

onConfigure

在服务器创建后调用 onConfigure 钩子。它应返回 Promise。

默认配置

如果未调用 configure(),可以通过导入获取默认配置:

import { defaultConfiguration } from "@hocuspocus/server";

钩子负载

传递给 onConfigure 钩子的 data 包含以下属性:

import { Configuration } from "@hocuspocus/server";

const data = {
  configuration: Configuration,
  version: string,
  instance: Hocuspocus,
};

示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  async onConfigure(data) {
    // 输出信息
    console.log(`服务器已配置!`);
  },
});

server.listen();

onConnect

当新连接建立时调用 onConnect 钩子。它应返回 Promise。抛出异常或拒绝 Promise 会终止连接。

钩子负载

传递给 onConnect 钩子的 data 包含以下属性:

import { URLSearchParams } from "url";
import { Doc } from "yjs";

const data = {
  documentName: string,
  instance: Hocuspocus,
  request: Request,
  requestHeaders: Headers,
  requestParameters: URLSearchParams,
  socketId: string,
  providerVersion: string | undefined,
  connection: {
    readOnly: boolean,
  },
};

示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  async onConnect(data) {
    // 输出信息
    console.log(`新的 websocket 连接`);
  },
});

server.listen();

onDestroy

调用 destroy 方法关闭服务器后,调用 onDestroy 钩子。它应返回 Promise。

钩子负载

传递给 onDestroy 钩子的 data 含有:

const data = {
  instance: Hocuspocus,
};

示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  async onDestroy(data) {
    // 输出信息
    console.log(`服务器已关闭!`);
  },
});

server.listen();

onDisconnect

连接断开时调用 onDisconnect 钩子。它应返回 Promise。

钩子负载

传递给 onDisconnect 钩子的 data 包含:

import { URLSearchParams } from "url";
import { Doc } from "yjs";

const data = {
  clientsCount: number,
  context: any,
  document: Document,
  documentName: string,
  instance: Hocuspocus,
  requestHeaders: Headers,
  requestParameters: URLSearchParams,
  socketId: string,
};

context 包含之前 onConnect 钩子提供的数据。

示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  async onDisconnect(data) {
    // 输出信息
    console.log(`"${data.context.user.name}" 已断开连接。`);
  },
});

server.listen();

onListen

服务器启动并开始接受连接后调用 onListen 钩子。它应返回 Promise。

钩子负载

传递给 onListen 钩子的 data 具有以下属性:

const data = {
  port: number,
};

示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  async onListen(data) {
    // 输出信息
    console.log(`服务器正在监听端口 "${data.port}"!`);
  },
});

server.listen();

onLoadDocument

调用此钩子来从存储中加载已有数据。你可能习惯加载 JSON/HTML 文档,但这不是 Y.js 的方式。为了能使 Y.js 正常工作,我们需要存储变更历史。只有这样,来自多个来源的变更才能合并。

你仍可以存储 JSON/HTML 文档,但应视其为数据的“视图”,而非数据源。

从 JSON/HTML 创建 Y.js 文档(一次性)

你可以基于现有数据(例如 JSON)创建 Y.js 文档。这应仅用于迁移数据,而非持久保存数据。

示例如下,针对 Tiptap 兼容的 JSON:

import { TiptapTransformer } from "@hocuspocus/transformer";
import Document from "@tiptap/extension-document";
import Paragraph from "@tiptap/extension-paragraph";
import Text from "@tiptap/extension-text";

const ydoc = TiptapTransformer.toYdoc(
  // 实际的 JSON 数据
  json,
  // Tiptap 中使用的字段,不清楚的使用 'default'
  "default",
  // Tiptap 扩展,确保有效的 schema
  [Document, Paragraph, Text]
);

若要导入 HTML,需要先转为 Tiptap 兼容的 JSON

但我们期望你从 onLoadDocument 钩子返回一个 Y.js 文档(或者自 v4 起返回原始的 Uint8Array Yjs 更新),无论它来自何处。

import { Server } from '@hocuspocus/server'

const server = new Server({
  async onLoadDocument(data) {
    // 从某处获取 Y.js 文档
    const ydoc =

    return ydoc
  },
})

server.listen()

自 v4 起,你也可以直接返回原始的 Uint8Array 格式 Yjs 更新,这对存储扩展很方便:

const server = new Server({
  async onLoadDocument({ documentName }) {
    const update = await loadUpdateFromStorage(documentName)
    return update // Uint8Array
  },
})

获取你的 Y.js 文档(推荐)

你可以用多种方式存储 Y.js 文档(及历史)。通常,你应使用 onStoreDocument 钩子,它被防抖并且几秒内会多次调用,用于存储修改文档。该钩子提供当前的 Y.js 文档,由你决定将其存至何处。放心,我们为你提供便捷的方式。

如果想快速实现,请查看本地开发的 SQLite 扩展,以及用于获取和存储文档的通用 Database 扩展。

钩子负载

传递给 onLoadDocument 钩子的 data 包含:

import { Doc } from "yjs";

const data = {
  context: any,
  document: Doc,
  documentName: string,
  instance: Hocuspocus,
  requestHeaders: Headers,
  requestParameters: URLSearchParams,
  socketId: string,
};

context 包含之前 onConnect 钩子提供的数据。

afterLoadDocument

在文档成功加载后调用 afterLoadDocument 钩子。这不同于 onLoadDocument ,后者是文档创建过程的一部分,可能失败(比如数据库中未找到文档)。

afterLoadDocument 在所有 onLoadDocument 钩子成功后运行,因此这时文档已被服务器标记为打开。

钩子负载

传递给 afterLoadDocument 钩子的 data 包含:

import { Doc } from "yjs";

const data = {
  context: any,
  document: Doc,
  documentName: string,
  instance: Hocuspocus,
  requestHeaders: Headers,
  requestParameters: URLSearchParams,
  socketId: string,
};

onRequest

当 Hocuspocus 的 HTTP 服务器接收到新请求时调用 onRequest 钩子。它应返回 Promise。如果你抛出异常或拒绝 Promise,则后续钩子不执行,你可以自行响应请求。类似于请求中间件机制。

这对想要在 Hocuspocus 运行的同一端口创建自定义路由非常有用。

钩子负载

传递给 onRequest 钩子的 data 包含:

import { IncomingMessage, ServerResponse } from "http";

const data = {
  request: IncomingMessage,
  response: ServerResponse,
  instance: Hocuspocus,
};

示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  onRequest(data) {
    return new Promise((resolve, reject) => {
      const { request, response } = data;

      // 如果请求匹配自定义路由
      if (request.url?.split("/")[1] === "custom-route") {
        // 响应自定义内容
        response.writeHead(200, { "Content-Type": "text/plain" });
        response.end("这是我的自定义回应,耶!");

        // 拒绝 Promise 将停止链,后续 onRequest 不被调用
        return reject();
      }

      resolve();
    });
  },
});

server.listen();

onStoreDocument

当文档发生变化后(紧接 onChange 钩子)调用 onStoreDocument,可用来将变更后的文档存入持久存储。调用该钩子默认带防抖(配置选项 debouncemaxDebounce)。

最简单的实现方式是继承 extension-database 扩展并实现 fetch()store() 方法,正如 extension-sqlite 中的做法。你也可以直接用钩子实现,只要确保在存储时正确应用/编码 yDoc 状态。

自 v4 起,onStoreDocument 会在任何文档变更时触发——不再仅限于 WebSocket 来源。由于该钩子现在也可能由不与特定连接绑定的来源触发(例如直接连接、Redis 复制),因此其负载已重新组织。如果钩子抛出异常,文档会保留在内存中并重试以避免数据丢失,且 Server.destroy() 会在关闭时刷新所有待处理的防抖存储。

如果你想让某个本地修改跳过存储钩子,可以在打开直接连接时为 LocalTransactionOrigin 设置 skipStoreHooks。扩展也可以抛出 SkipFurtherHooksError(来自 @hocuspocus/common),以表明持久化已经处理完毕,后续钩子应被跳过。

Hook 负载

传递给 onStoreDocument 钩子的 data 包含:

import { Doc } from "yjs";
import type { TransactionOrigin } from "@hocuspocus/server";

const data = {
  clientsCount: number,
  document: Doc,
  documentName: string,
  instance: Hocuspocus,
  // 触发该钩子的最后一个连接的上下文(在 v3 中名为 `context`)
  lastContext: any,
  // 最后一次事务的结构化来源(在 v3 中名为 `transactionOrigin`)
  lastTransactionOrigin: TransactionOrigin,
};

在 v3 中,该负载还包含 contextrequestHeadersrequestParameterssocketIdtransactionOrigin。在 v4 中,这些字段已被移除或重命名——如有需要,请通过 lastContext 访问与连接相关的数据。

onUpgrade

当 Hocuspocus 的 HTTP 服务器接收到新升级请求时调用 onUpgrade 钩子。它应返回 Promise。抛出异常或拒绝 Promise 会阻止后续钩子执行,使你可以自行响应并升级请求。类似请求中间件机制。

这适合在 Hocuspocus 运行的同一端口创建自定义 websocket 路由。

钩子负载

传递给 onUpgrade 钩子的 data 包含:

import { IncomingMessage } from "http";
import { Socket } from "net";

const data = {
  head: any,
  request: IncomingMessage,
  socket: Socket,
  instance: Hocuspocus,
};

示例

import { Server } from "@hocuspocus/server";
import WebSocket, { WebSocketServer } from "ws";

const server = new Server({
  onUpgrade(data) {
    return new Promise((resolve, reject) => {
      const { request, socket, head } = data;

      // 如果请求匹配自定义路由
      if (request.url?.split("/")[1] === "custom-route") {
        // 创建自己的 websocket 服务器处理升级,确保 noServer 设置为 true
        const websocketServer = new WebSocketServer({ noServer: true });
        websocketServer.on("connection", (connection: WebSocket, request: IncomingMessage) => {
          // 这里处理新连接和消息订阅的逻辑
          console.log("已连接到自定义的 websocket 服务器!");
        });

        // 在自己的 websocket 服务器中处理升级请求
        websocketServer.handleUpgrade(request, socket, head, (ws) => {
          websocketServer.emit("connection", ws, request);
        });

        // 拒绝 Promise 将终止链,后续 onUpgrade 不调用
        return reject();
      }

      resolve();
    });
  },
});

server.listen();

onStateless

服务器接收到无状态消息后调用 onStateless 钩子。它应返回 Promise。

钩子负载

传递给 onStateless 钩子的 data 包含:

const data = {
  connection: Connection,
  documentName: string,
  document: Document,
  payload: string,
}

示例

import { Server } from '@hocuspocus/server'

const server = new Server({
  async onStateless({ payload, document, connection }) {
    // 输出信息
    console.log(`服务器已收到无状态消息 "${payload}"!`)
    // 向基于文档的所有连接广播无状态消息
    document.broadcastStateless('这是广播消息。')
    // 向特定连接发送无状态消息
    connection.sendStateless('这是特定消息。')
  },
})

server.listen()

beforeSync

在处理同步消息前调用此钩子。适合检查将要应用到文档的同步消息。

注意:此钩子不是异步的

钩子负载

const data = {
  documentName: string,
  document: Document,
  // y-protocols/sync 消息类型
  type: number,
  // y-protocols/sync 消息内容
  payload: Uint8Array,
}

示例

import { Server } from '@hocuspocus/server'

const server = new Server({
  beforeSync({ payload, document, documentName, type }) {
    console.log(`服务器将处理同步消息: "${payload}"!`)
  },
})

server.listen()

beforeBroadcastStateless

服务器在广播无状态消息之前调用该钩子。

钩子负载

传递给 beforeBroadcastStateless 钩子的 data 包含:

import { Doc } from 'yjs'

const data = {
  documentName: string,
  document: Doc,
  payload: string,
}

示例

import { Server } from '@hocuspocus/server'

const server = new Server({
  beforeBroadcastStateless({ payload }) {
    console.log(`服务器将广播无状态消息: "${payload}"!`)
  },
})

server.listen()

afterUnloadDocument

文档在服务器关闭后调用该钩子。此时文档已被销毁,无法访问,但你可以通知所有订阅了该文档的内容。

钩子负载

传递给 afterUnloadDocument 钩子的 data 包含:

const data = {
  instance: Hocuspocus,
  documentName: string,
};

示例

import { Server } from "@hocuspocus/server";

const server = new Server({
  async afterUnloadDocument(data) {
    // 输出信息
    console.log(`文档 ${data.documentName} 已关闭`);
  },
});

server.listen();