一杯茶时间带你基于 Yjs 和 reactflow 构建协同流程图编辑器

通过前面几篇文章,我们深入学习了 Yjs 的核心概念、y-protocols 协议、y-websocket 网络同步和 y-indexeddb 本地持久化。现在让我们把这些知识串联起来,从零开始构建一个真实可用的协同流程图编辑器。

想象一下这样的场景:多个用户同时打开同一个流程图,拖动节点、创建连接、实时看到彼此的光标位置,就像在同一块白板上协作一样流畅。这就是我们要实现的效果。这个案例会涵盖协同编辑的核心场景:数据同步、状态管理、光标协同,以及前后端的完整实现。

项目架构

让我们先看看整体架构。这个协同流程图应用由三个核心部分组成:前端使用 React + ReactFlow 构建可视化流程图界面,后端使用 NestJS 搭建 WebSocket 服务器处理协同逻辑,Yjs 作为协同引擎在前后端之间同步数据。

前端负责用户交互和视图渲染,Yjs Doc 存储流程图的节点和连接数据,WebsocketProvider 负责与服务器建立连接并同步数据。后端的 WebSocket Server 接收来自多个客户端的消息,协调它们之间的数据同步,WSSharedDoc 存储每个房间的文档状态。

后端实现

让我们从后端开始,因为它是整个协同系统的中枢。后端的职责很清晰:接收客户端连接,转发同步消息,管理 Awareness 状态。

我们使用 NestJS 框架搭建后端服务,整个协同逻辑封装在一个 Service 中。首先创建项目结构:

依赖安装

在开始编写代码前,需要安装必要的依赖:

pnpm add yjs y-protocols ws lib0
pnpm add -D @types/ws

核心数据结构

src/yjs/yjs.service.ts 文件中,首先导入需要的模块,然后定义 WSSharedDoc 类:

import { Injectable, OnModuleInit, Logger } from "@nestjs/common";
import * as WebSocket from "ws";
import * as Y from "yjs";
import * as syncProtocol from "y-protocols/sync";
import * as awarenessProtocol from "y-protocols/awareness";
import * as encoding from "lib0/encoding";
import * as decoding from "lib0/decoding";
import * as map from "lib0/map";

const wsReadyStateOpen = 1;
const messageSync = 0;
const messageAwareness = 1;

const docs: Map<string, WSSharedDoc> = new Map();

这些导入包括:NestJS 的装饰器和日志工具,WebSocket 库用于建立连接,Yjs 核心库和协议库,lib0 提供的编解码和 Map 工具。常量定义了消息类型和连接状态。

WSSharedDoc 类继承自 Y.Doc,是服务端维护的共享文档:

class WSSharedDoc extends Y.Doc {
  name: string;
  conns: Map<WebSocket, Set<number>>;
  awareness: awarenessProtocol.Awareness;

  constructor(name: string) {
    super({ gc: true });
    this.name = name;
    this.conns = new Map();
    this.awareness = new awarenessProtocol.Awareness(this);
  }
}

这个类扩展了 Yjs 文档,添加了连接管理和 Awareness 支持。name 是文档的唯一标识(通常是房间名),conns 维护了当前所有连接到这个文档的 WebSocket 客户端,awareness 用于同步用户的临时状态(如光标位置)。

在同一文件中,文档管理使用一个全局 Map,确保同一个房间的所有客户端共享同一个文档实例:

const docs: Map<string, WSSharedDoc> = new Map();

const getYDoc = (docname: string): WSSharedDoc =>
  map.setIfUndefined(docs, docname, () => new WSSharedDoc(docname));

处理客户端连接

接下来在 YjsService 类中实现连接处理。当客户端连接时,我们需要建立 WebSocket 连接,从 URL 中提取房间名,然后设置消息处理逻辑:

onModuleInit() {
  this.wss = new WebSocket.Server({ port: 1234 });

  this.wss.on('connection', (conn: WebSocket, req: any) => {
    const url = req.url || '/';
    const docName = url.slice(1).split('?')[0] || 'default';
    this.setupWSConnection(conn, docName);
  });

  this.logger.log('WebSocket server initialized on port 1234');
}

这里从请求 URL 中提取房间名,比如 ws://localhost:1234/flow-room 会提取出 flow-room 作为文档名称。这样不同的房间使用不同的文档,互不干扰。

消息处理

setupWSConnection 是整个服务端的核心,它处理客户端的所有消息:

private setupWSConnection(conn: WebSocket, docName: string) {
  conn.binaryType = 'arraybuffer';
  const doc = getYDoc(docName);
  doc.conns.set(conn, new Set());

  conn.on('message', (message: ArrayBuffer | Buffer) => {
    const uint8Array = message instanceof ArrayBuffer
      ? new Uint8Array(message)
      : new Uint8Array(message.buffer, message.byteOffset, message.byteLength);

    const encoder = encoding.createEncoder();
    const decoder = decoding.createDecoder(uint8Array);
    const messageType = decoding.readVarUint(decoder);

    switch (messageType) {
      case messageSync:
        encoding.writeVarUint(encoder, messageSync);
        syncProtocol.readSyncMessage(decoder, encoder, doc, null);
        if (encoding.length(encoder) > 1) {
          send(doc, conn, encoding.toUint8Array(encoder));
        }
        break;
      case messageAwareness:
        awarenessProtocol.applyAwarenessUpdate(
          doc.awareness,
          decoding.readVarUint8Array(decoder),
          conn,
        );
        break;
    }
  });
}

消息分为两种类型:Sync 消息用于同步文档数据(节点、连接等持久化数据),Awareness 消息用于同步用户状态(光标位置等临时数据)。服务端收到 Sync 消息后,使用 y-protocols 的 readSyncMessage 处理,如果有响应数据就发送回客户端。收到 Awareness 消息后,应用更新并广播给其他客户端。

初始同步和状态广播

新客户端连接后,需要立即发送当前文档状态,让新用户能看到已有的内容:

setImmediate(() => {
  if (conn.readyState === wsReadyStateOpen) {
    const encoder = encoding.createEncoder();
    encoding.writeVarUint(encoder, messageSync);
    syncProtocol.writeSyncStep1(encoder, doc);
    send(doc, conn, encoding.toUint8Array(encoder));

    const awarenessStates = doc.awareness.getStates();
    if (awarenessStates.size > 0) {
      const awarenessEncoder = encoding.createEncoder();
      encoding.writeVarUint(awarenessEncoder, messageAwareness);
      encoding.writeVarUint8Array(
        awarenessEncoder,
        awarenessProtocol.encodeAwarenessUpdate(
          doc.awareness,
          Array.from(awarenessStates.keys())
        )
      );
      send(doc, conn, encoding.toUint8Array(awarenessEncoder));
    }
  }
});

使用 setImmediate 延迟发送,确保连接完全建立。首先发送 Sync Step1 消息,包含当前文档的完整状态向量,然后如果有其他用户在线,发送他们的 Awareness 状态,这样新用户能立即看到其他人的光标。

监听和广播更新

服务端需要监听文档的更新,并将更新广播给所有连接的客户端:

const updateHandler = (update: Uint8Array) => {
  const encoder = encoding.createEncoder();
  encoding.writeVarUint(encoder, messageSync);
  syncProtocol.writeUpdate(encoder, update);
  const message = encoding.toUint8Array(encoder);
  doc.conns.forEach((_, c) => send(doc, c, message));
};
doc.on("update", updateHandler);

const awarenessChangeHandler = ({ added, updated, removed }) => {
  const changedClients = added.concat(updated, removed);
  const connControlledIDs = doc.conns.get(conn);
  if (connControlledIDs !== undefined) {
    added.forEach((clientID) => connControlledIDs.add(clientID));
    removed.forEach((clientID) => connControlledIDs.delete(clientID));
  }

  const encoder = encoding.createEncoder();
  encoding.writeVarUint(encoder, messageAwareness);
  encoding.writeVarUint8Array(
    encoder,
    awarenessProtocol.encodeAwarenessUpdate(doc.awareness, changedClients)
  );
  const buff = encoding.toUint8Array(encoder);
  doc.conns.forEach((_, c) => send(doc, c, buff));
};
doc.awareness.on("update", awarenessChangeHandler);

当文档有更新时(比如用户移动了节点),将增量更新编码后广播给所有客户端。当 Awareness 状态变化时(比如用户移动了光标),也广播给所有客户端。注意这里会追踪每个连接控制的客户端 ID,用于后续清理。

连接清理

当客户端断开连接时,需要清理相关资源,移除其 Awareness 状态,如果房间空了就销毁文档:

conn.on("close", () => {
  const controlledIds = doc.conns.get(conn);
  doc.conns.delete(conn);
  doc.awareness.off("update", awarenessChangeHandler);
  doc.off("update", updateHandler);

  if (controlledIds) {
    awarenessProtocol.removeAwarenessStates(
      doc.awareness,
      Array.from(controlledIds),
      null
    );
  }

  if (doc.conns.size === 0) {
    doc.destroy();
    docs.delete(docName);
  }
});

这里移除事件监听器防止内存泄漏,清除该客户端的 Awareness 状态,如果是最后一个客户端离开,销毁文档释放内存。

这些逻辑都编写完成之后,我们就可以执行如下名来来启动项目:

pnpm start:dev

前端实现

前端的实现更加复杂一些,因为要处理用户交互、UI 渲染和协同同步。我们使用 ReactFlow 作为流程图渲染引擎,Yjs 处理数据同步。

前端项目结构如下:

前端依赖安装

前端需要安装以下依赖:

pnpm add yjs y-websocket reactflow

React Flow 是一个用于构建交互式图形和流程图的库,提供了丰富的功能,如拖拽节点、连接线、缩放、平移等,常用于可视化编辑器、流程设计器等应用中。它支持高度定制,可以很容易地与其他前端库和框架集成。

初始化协同环境

src/components/Canvas.tsx 文件中,首先导入需要的库:

import { useCallback, useEffect, useRef, useState } from "react";
import {
  ReactFlow,
  Controls,
  Background,
  useNodesState,
  useEdgesState,
  addEdge,
  BackgroundVariant,
  ConnectionMode,
  useReactFlow,
  ReactFlowProvider,
  type Connection,
  type Edge,
  type Node,
  NodeChange,
  EdgeChange,
} from "reactflow";
import "reactflow/dist/style.css";
import * as Y from "yjs";
import { WebsocketProvider } from "y-websocket";
import Cursor from "./Cursor";

然后在 Canvas 组件中创建 Yjs 文档和 WebSocket 连接:

useEffect(() => {
  const doc = new Y.Doc();
  const wsProvider = new WebsocketProvider(
    "ws://localhost:1234",
    "flow-room",
    doc,
    {
      connect: false,
      resyncInterval: -1,
    }
  );

  setTimeout(() => {
    wsProvider.connect();
  }, 50);

  const nodesMap = doc.getMap("nodes");
  const edgesMap = doc.getMap("edges");

  ydoc.current = doc;
  provider.current = wsProvider;

  // ... 后续逻辑
}, []);

这里延迟连接是为了避免过快重连的警告,使用 resyncInterval: -1 禁用自动重新同步。我们使用两个 Y.Map 分别存储节点和连接,这样可以独立管理它们的增删改。

初始化 Awareness

在同一个 useEffect 中,设置本地用户的 Awareness 状态,包括随机生成的颜色和客户端 ID:

const clientId = wsProvider.awareness.clientID;
const userColor = useRef(generateRandomColor());

wsProvider.awareness.setLocalState({
  cursor: null,
  color: userColor.current,
  clientId: clientId,
});

wsProvider.awareness.on("change", () => {
  const states = new Map(wsProvider.awareness.getStates());
  setCursors(states);
});

监听 Awareness 变化,当其他用户的光标移动时,更新本地状态并重新渲染光标。

同步节点数据

继续在 useEffect 中,如果是首次打开,初始化一些默认节点,然后监听节点变化,将 Yjs 数据同步到 React 状态:

if (nodesMap.size === 0) {
  initialNodes.forEach((node) => {
    nodesMap.set(node.id, JSON.parse(JSON.stringify(node)));
  });
}

nodesMap.observe(() => {
  const yNodes = Array.from(nodesMap.values());
  const validNodes = yNodes.map((node) => ({
    id: node.id,
    type: node.type || "default",
    data: node.data,
    position: {
      x: node.position.x,
      y: node.position.y,
    },
  }));
  setNodes(validNodes);
});

使用 JSON.parse(JSON.stringify()) 确保存储的是纯数据对象,避免 Yjs 无法序列化 React 组件。observe 方法监听 Y.Map 的变化,任何修改都会触发回调,我们将 Yjs 数据转换为 ReactFlow 需要的格式。

处理节点拖动

当用户拖动节点时,需要将位置更新同步到 Yjs 文档:

const handleNodesChange = useCallback(
  (changes: NodeChange[]) => {
    onNodesChange(changes);

    if (!ydoc.current) return;

    changes.forEach((change) => {
      if (change.type === "position") {
        const node = nodes.find((n) => n.id === change.id);
        if (node) {
          const updatedNode = {
            ...node,
            position: change.position || node.position,
          };
          ydoc.current
            ?.getMap("nodes")
            .set(change.id, JSON.parse(JSON.stringify(updatedNode)));
        }
      }
    });
  },
  [nodes, onNodesChange]
);

ReactFlow 使用 changes 数组描述节点的变化,我们过滤出位置变化,更新到 Yjs 文档。这个更新会触发 observe 回调,同时通过 WebSocket 发送给其他客户端。

创建连接

当用户连接两个节点时,创建一条边并同步到 Yjs:

const onConnect = useCallback(
  (connection: Connection) => {
    if (!connection.source || !connection.target) return;

    const newEdge = {
      id: `e${connection.source}-${connection.target}`,
      source: connection.source,
      target: connection.target,
      sourceHandle: connection.sourceHandle || undefined,
      targetHandle: connection.targetHandle || undefined,
    };

    if (ydoc.current) {
      ydoc.current.getMap("edges").set(newEdge.id, newEdge);
    }

    setEdges((eds) => addEdge(connection, eds));
  },
  [setEdges]
);

生成唯一的边 ID,同时更新本地状态和 Yjs 文档。本地状态用于立即响应,Yjs 文档用于同步给其他客户端。

光标同步

光标同步是协同编辑中很重要的用户体验细节,让用户能看到彼此的位置:

const handleMouseMove = useCallback(
  (e: React.MouseEvent) => {
    if (!provider.current?.awareness || !flowRef.current) return;

    const bounds = flowRef.current.getBoundingClientRect();
    const x = e.clientX - bounds.left;
    const y = e.clientY - bounds.top;

    const flowPosition = reactFlowInstance.screenToFlowPosition({ x, y });

    provider.current.awareness.setLocalState({
      cursor: flowPosition,
      color: userColor.current,
      clientId: provider.current.awareness.clientID,
    });
  },
  [reactFlowInstance]
);

const handleMouseLeave = useCallback(() => {
  if (!provider.current?.awareness) return;

  provider.current.awareness.setLocalState({
    cursor: null,
    color: userColor.current,
    clientId: provider.current.awareness.clientID,
  });
}, []);

获取鼠标相对于容器的坐标,使用 ReactFlow 的 screenToFlowPosition 转换为画布坐标(考虑缩放和平移),然后更新到 Awareness。当鼠标离开画布时,将光标设置为 null,其他用户就看不到这个光标了。

渲染协同光标

最后在 Canvas 组件的 JSX 中渲染其他用户的光标,使用不同颜色区分:

{
  Array.from(cursors.entries()).map(([clientId, state]) => {
    if (!state.cursor || clientId === currentClientId) {
      return null;
    }

    const screenPosition = reactFlowInstance.flowToScreenPosition({
      x: state.cursor.x,
      y: state.cursor.y,
    });

    return (
      <Cursor
        key={clientId}
        x={screenPosition.x}
        y={screenPosition.y}
        color={state.color}
      />
    );
  });
}

过滤掉自己的光标,将画布坐标转换回屏幕坐标(因为光标是绝对定位在屏幕上的),使用每个用户的颜色渲染光标组件。

协同光标组件

src/components/Cursor.tsx 中实现光标组件,我们使用的 SVG 的方式来实现这个光标组件:

interface CursorProps {
  x: number;
  y: number;
  color: string;
}

export default function Cursor({ x, y, color }: CursorProps) {
  return (
    <div
      style={{
        position: "absolute",
        left: x,
        top: y,
        pointerEvents: "none",
        zIndex: 9999,
      }}
    >
      <svg width="24" height="24" viewBox="0 0 24 24">
        <path
          d="M5.65376 12.3673L5.46026 12.4668L5.70974 12.8619L11.3877 21.5659L12.6377 23.192L13.1897 21.2687L16.855 8.71373L19.0002 0.939087L10.5578 3.70771L0.458496 7.88092L-1.56467 8.73732L0.0837585 10.0768L5.65376 12.3673Z"
          fill={color}
        />
      </svg>
    </div>
  );
}

这个组件接收坐标和颜色,使用 SVG 绘制一个指针形状,通过 pointerEvents: "none" 确保不会干扰用户的鼠标操作。

协同流程详解

让我们通过一个完整的场景来理解整个协同过程。假设用户 A 拖动了一个节点,数据是如何同步到用户 B 的?

整个过程分为三个阶段:

  1. 第一阶段,用户 A 拖动节点,触发 handleNodesChange,更新 Yjs 文档,Yjs 文档生成增量更新。
  2. 第二阶段,WebsocketProvider 监听到 update 事件,将增量编码为二进制消息,通过 WebSocket 发送给服务器,服务器应用更新到服务端文档。
  3. 第三阶段,服务器将更新广播给所有其他客户端,用户 B 的 WebsocketProvider 收到消息,解码后应用到本地文档,触发 observe 回调,更新 React 状态,ReactFlow 重新渲染,用户 B 看到节点移动了。

这个过程非常高效,因为传输的是增量数据而不是完整文档,一次节点移动可能只需要传输几十个字节。Yjs 的 CRDT 算法保证了即使有并发修改,最终所有客户端的状态也会一致。

因为我们项目使用的是 Monorepo 项目,所以我们只需要在终端上启动如下命令即可:

pnpm dev

如果看到这样的输出,说明我们的前后端都启动成功了:

现在我们需要在不同的浏览器都打开相同的链接:

http://localhost:5173/

因为这是用 vite 创建的项目,所以默认启动的是 5173(我要吃饭端口)

最终你能看到两端鼠标实时同步过来,并且拖动节点能同步到另外一段浏览器,这就是我们实现的最简单的协同编辑器。

基于这个简单的协同编辑器架构,我们可以进一步扩展和丰富其功能。例如,可以增加用户身份、编辑历史回溯、权限管理、节点间的关系建立等更多功能,进一步提升协作体验和工具的实用性。

Yjs 和 WebSocket 的分工

在这个架构中,Yjs 和 WebSocket 各司其职,共同完成协同编辑。

Yjs 的职责是维护文档状态和处理冲突。它使用 CRDT 算法确保多个客户端并发编辑时数据最终一致,生成增量更新而不是完整快照,大大减少了网络传输量,提供了 Y.Map、Y.Array 等数据结构,让我们能方便地组织复杂的文档数据。Yjs 不关心数据如何传输,它只负责生成和应用更新。

WebSocket 的职责是网络通信。它建立客户端和服务器之间的双向连接,传输 Yjs 生成的二进制消息,处理连接断开、重连等网络问题,维护房间隔离,确保不同房间的消息不会混淆。WebSocket 不关心消息的内容,它只负责可靠地传输数据。

y-websocket 将两者连接起来,监听 Yjs 文档的更新,自动发送给服务器,接收服务器的消息,自动应用到 Yjs 文档,处理同步协议(State Vector、Update)的编解码。这种职责分离的设计非常优雅,我们可以轻松替换传输层(比如用 WebRTC 代替 WebSocket)而不影响 Yjs 的使用。

使用 Hocuspocus 简化开发

虽然手动实现 WebSocket 服务器能让我们深入理解协同原理,但在实际项目中,我们通常会使用更成熟的解决方案。Hocuspocus 就是 Yjs 官方推荐的协同后端框架,它提供了开箱即用的功能。

Hocuspocus 的核心优势在于功能完整和易于扩展。它内置了持久化支持,可以将文档保存到数据库(支持 Redis、PostgreSQL、SQLite 等),提供了身份验证和权限控制的钩子,支持文档历史和版本管理,内置了扩展系统,可以轻松添加自定义逻辑,还提供了监控和日志功能,便于调试和运维。

使用 Hocuspocus 搭建服务器非常简单:

import { Server } from "@hocuspocus/server";

const server = Server.configure({
  port: 1234,

  async onAuthenticate({ token, documentName }) {
    // 验证用户身份
    return { user: { id: 1, name: "John" } };
  },

  async onLoadDocument({ documentName }) {
    // 从数据库加载文档
    return Y.encodeStateAsUpdate(doc);
  },

  async onStoreDocument({ documentName, state }) {
    // 保存文档到数据库
    await db.save(documentName, state);
  },
});

server.listen();

前端使用 HocuspocusProvider 替代 WebsocketProvider:

import { HocuspocusProvider } from "@hocuspocus/provider";

const provider = new HocuspocusProvider({
  url: "ws://localhost:1234",
  name: "flow-room",
  document: doc,
  token: "user-auth-token",
});

Hocuspocus 还提供了很多实用的扩展,比如 Webhooks 扩展可以在文档变化时触发 HTTP 请求,Logger 扩展可以记录所有操作日志,Throttle 扩展可以限制更新频率防止滥用,Database 扩展提供了多种数据库的开箱即用支持。

对于生产环境的应用,建议使用 Hocuspocus 而不是自己实现 WebSocket 服务器,这样可以节省大量开发时间,获得更稳定可靠的服务。

总结

通过这个实战案例,我们完整地实现了一个协同流程图编辑器,涵盖了协同编辑的核心要素。

首先是架构设计。前端使用 React + ReactFlow 构建 UI,后端使用 NestJS + WebSocket 处理协同,Yjs 作为协同引擎在前后端之间同步数据。这种架构清晰地分离了视图层、业务逻辑层和传输层。

其次是实现细节。后端管理文档实例和连接,处理 Sync 和 Awareness 消息,监听文档更新并广播给所有客户端。前端初始化 Yjs 文档和 WebSocket 连接,使用 Y.Map 存储节点和连接,监听用户操作并更新 Yjs 文档,渲染其他用户的协同光标。

再看协同流程。用户操作触发 Yjs 文档更新,生成增量数据,通过 WebSocket 发送给服务器,服务器广播给其他客户端,其他客户端应用更新并重新渲染。整个过程高效且可靠,得益于 Yjs 的 CRDT 算法和增量同步机制。

最后是工程化建议。在理解了底层原理后,实际项目中建议使用 Hocuspocus 简化开发,它提供了持久化、权限控制、监控等生产级功能,让我们能专注于业务逻辑而不是底层实现。

通过这个案例,我们不仅学会了如何构建协同应用,更重要的是理解了协同编辑的核心思想:通过 CRDT 算法保证最终一致性,通过增量同步减少网络传输,通过 Awareness 实现实时协作体验。这些理念可以应用到各种协同场景中,无论是文档编辑、画板协作还是数据表格。

参考资料

#一人推荐一个值得做的项目##实习,不懂就问##26年哪些行业会变好/更差##卷__卷不过你们,只能卷__了##去年的flag与今年的小目标#
全部评论

相关推荐

评论
1
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务