使用 Payload CMS 构建简单的实时聊天应用程序

有效载荷聊天

Payload 是开源的全栈 Next.js 框架,可为您提供即时后端超能力。立即获得完整的 TypeScript 后端和管理面板。使用 Payload 作为无头 CMS 或构建强大的应用程序。

使用(服务器发送事件)SSE 在后端使用自定义端点负载将更新发送到客户端,客户端使用 EventSource API 监听更新。

这只是一个练习,看看当使用 Payload 作为构建应用程序的平台(而不仅仅是 CMS)时可以做哪些有趣的事情

视频

此代码与下面的视频配套,本博文末尾有完整源代码的链接

项目结构

  • client/- 使用 Vite 构建的 React 前端
  • server/-有效负载后端服务器
  • 解决方案的主要组成部分

    服务器:消息收集设置

    我不会介绍设置有效载荷应用程序的具体细节,这些内容已在文档中介绍。但要开始操作,请运行以下命令,并在系统提示您输入数据库时​​选择“sqlite”。

    npx create-payload-app@latest -t blank

    创建一个名为“Messages.ts”的新文件并将其添加到有效负载项目文件夹“src/collections”。

    消息集合

    export const Messages: CollectionConfig = {
      slug: "messages",
      // not focused on access in this example
      access: {
        create: () => true,
        read: () => true,
        update: () => true,
        delete: () => true,
      },
      endpoints: [SSEMessages], // <-- ADDED CUSTOM ENDPOINT api/messages/sse
      fields: [
        {
          name: "sender",
          type: "relationship",
          relationTo: "users",
          required: true,
        },
        {
          name: "receiver",
          type: "relationship",
          relationTo: "users",
          required: true,
        },
        {
          name: "content",
          type: "text",
          required: true,
        },
        {
          name: "timestamp",
          type: "date",
          required: true,
          defaultValue: () => new Date().toISOString(),
        },
      ],
    };

    打开文件“src/payload-config.ts”添加一个新的导入。

    import { Messages } from './collections/Messages'

    在同一文件的 `buildConfig` 部分中,有一个名为 `collections` 的属性,添加 `Messages`

    collections: [Users, Media, Messages],

    最后,让我们设置“cors”属性,这样我们就可以确保我们的 vite 应用可以访问 Payload 服务器

    cors: ['*', 'http://localhost:3000', 'http://localhost:5173'],

    服务器:SSE 的自定义端点已添加到集合

    客户端访问下面的代码来从服务器获取更新流。

    除了函数“pollMessages”之外,大部分代码都是为连接设置的,该函数根据以前的时间戳查询消息集合中的更新消息,并将它们作为有效负载发送到侦听器连接。

    此代码放入文件“src/endpoints/SSEMessages.ts”

    import type { Endpoint } from "payload";
    
    /**
     * Server-Sent Events (SSE) endpoint for Messages collection using TransformStream
     * Implements a polling mechanism to check for new messages and stream them to clients
     */
    export const SSEMessages: Endpoint = {
      path: "/sse",
      method: "get",
      handler: async (req) => {
        try {
          // Create abort controller to handle connection termination
          const abortController = new AbortController();
          const { signal } = abortController;
    
          // Set up streaming infrastructure
          const stream = new TransformStream();
          const writer = stream.writable.getWriter();
          const encoder = new TextEncoder();
    
          // Initialize timestamp to fetch all messages from the beginning
          let lastTimestamp = new Date(0).toISOString();
    
          // Send keep-alive messages every 30 seconds to maintain connection
          const keepAlive = setInterval(async () => {
            if (!signal.aborted) {
              await writer.write(
                encoder.encode("event: ping\ndata: keep-alive\n\n")
              );
            }
          }, 30000);
    
          /**
           * Polls for new messages and sends them to connected clients
           * - Queries messages newer than the last received message
           * - Updates lastTimestamp to the newest message's timestamp
           * - Streams messages to client using SSE format
           */
          const pollMessages = async () => {
            if (!signal.aborted) {
              // Query for new messages since last update
              const messages = await req.payload.find({
                collection: "messages",
                where: {
                  updatedAt: { greater_than: lastTimestamp },
                },
                sort: "-updatedAt",
                limit: 10,
                depth: 1,
                populate: {
                  users: {
                    email: true,
                  },
                },
              });
    
              if (messages.docs.length > 0) {
                // Update timestamp to latest message for next poll
                lastTimestamp = messages.docs[0].updatedAt;
                // Send messages to client in SSE format
                await writer.write(
                  encoder.encode(
                    `event: message\ndata: ${JSON.stringify(messages.docs)}\n\n`
                  )
                );
              }
            }
          };
    
          // Poll for new messages every second
          const messageInterval = setInterval(pollMessages, 1000);
    
          // Clean up intervals and close writer when connection is aborted
          signal.addEventListener("abort", () => {
            clearInterval(keepAlive);
            clearInterval(messageInterval);
            writer.close();
          });
    
          // Return SSE response with appropriate headers
          return new Response(stream.readable, {
            headers: {
              "Content-Type": "text/event-stream",
              "Cache-Control": "no-cache",
              Connection: "keep-alive",
              "X-Accel-Buffering": "no", // Prevents nginx from buffering the response
              "Access-Control-Allow-Origin": "*", // CORS header for cross-origin requests
              "Access-Control-Allow-Methods": "GET, OPTIONS",
              "Access-Control-Allow-Headers": "Content-Type",
            },
          });
        } catch (error) {
          console.log(error);
          return new Response("Error occurred", { status: 500 });
        }
      },
    };

    客户

    客户端应用程序是使用`vite cli`创建的反应应用程序。

    `endpoint` = `[payload server]/api/messages/sse`

    我们连接到端点然后监听响应。

    如何连接 SSE 的服务器

    useEffect(() => {
      // Create EventSource connection
      const eventSource = new EventSource(
        `${import.meta.env.VITE_API_URL}${endpoint}`
      );
    
      // Handle incoming messages
      eventSource.onmessage = (event) => {
        const data = JSON.parse(event.data);
        setMessages((prev) => [...prev, data]);
      };
    
      // Handle connection open
      eventSource.onopen = () => {
        console.log("SSE Connection Established");
      };
    
      // Handle errors
      eventSource.onerror = (error) => {
        console.error("SSE Error:", error);
        eventSource.close();
      };
    
      // Cleanup on component unmount
      return () => {
        eventSource.close();
      };
    }, [endpoint]);

    完整源代码

  • https://github.com/aaronksaunders/payload-chat