使用 Payload CMS 构建简单的实时聊天应用程序
有效载荷聊天
Payload 是开源的全栈 Next.js 框架,可为您提供即时后端超能力。立即获得完整的 TypeScript 后端和管理面板。使用 Payload 作为无头 CMS 或构建强大的应用程序。
使用(服务器发送事件)SSE 在后端使用自定义端点负载将更新发送到客户端,客户端使用 EventSource API 监听更新。
这只是一个练习,看看当使用 Payload 作为构建应用程序的平台(而不仅仅是 CMS)时可以做哪些有趣的事情
视频
此代码与下面的视频配套,本博文末尾有完整源代码的链接
项目结构
解决方案的主要组成部分
服务器:消息收集设置
我不会介绍设置有效载荷应用程序的具体细节,这些内容已在文档中介绍。但要开始操作,请运行以下命令,并在系统提示您输入数据库时选择“sqlite”。
npx create-payload-app@latest -t blank
创建一个名为“Messages.ts”的新文件并将其添加到有效负载项目文件夹“src/collections”。
消息集合
export const Messages: CollectionConfig = { slug: "messages", // not focused on access in this example access: { create: () => true, read: () => true, update: () => true, delete: () => true, }, endpoints: [SSEMessages], // <-- ADDED CUSTOM ENDPOINT api/messages/sse fields: [ { name: "sender", type: "relationship", relationTo: "users", required: true, }, { name: "receiver", type: "relationship", relationTo: "users", required: true, }, { name: "content", type: "text", required: true, }, { name: "timestamp", type: "date", required: true, defaultValue: () => new Date().toISOString(), }, ], };
打开文件“src/payload-config.ts”添加一个新的导入。
import { Messages } from './collections/Messages'
在同一文件的 `buildConfig` 部分中,有一个名为 `collections` 的属性,添加 `Messages`
collections: [Users, Media, Messages],
最后,让我们设置“cors”属性,这样我们就可以确保我们的 vite 应用可以访问 Payload 服务器
cors: ['*', 'http://localhost:3000', 'http://localhost:5173'],
服务器:SSE 的自定义端点已添加到集合
客户端访问下面的代码来从服务器获取更新流。
除了函数“pollMessages”之外,大部分代码都是为连接设置的,该函数根据以前的时间戳查询消息集合中的更新消息,并将它们作为有效负载发送到侦听器连接。
此代码放入文件“src/endpoints/SSEMessages.ts”
import type { Endpoint } from "payload"; /** * Server-Sent Events (SSE) endpoint for Messages collection using TransformStream * Implements a polling mechanism to check for new messages and stream them to clients */ export const SSEMessages: Endpoint = { path: "/sse", method: "get", handler: async (req) => { try { // Create abort controller to handle connection termination const abortController = new AbortController(); const { signal } = abortController; // Set up streaming infrastructure const stream = new TransformStream(); const writer = stream.writable.getWriter(); const encoder = new TextEncoder(); // Initialize timestamp to fetch all messages from the beginning let lastTimestamp = new Date(0).toISOString(); // Send keep-alive messages every 30 seconds to maintain connection const keepAlive = setInterval(async () => { if (!signal.aborted) { await writer.write( encoder.encode("event: ping\ndata: keep-alive\n\n") ); } }, 30000); /** * Polls for new messages and sends them to connected clients * - Queries messages newer than the last received message * - Updates lastTimestamp to the newest message's timestamp * - Streams messages to client using SSE format */ const pollMessages = async () => { if (!signal.aborted) { // Query for new messages since last update const messages = await req.payload.find({ collection: "messages", where: { updatedAt: { greater_than: lastTimestamp }, }, sort: "-updatedAt", limit: 10, depth: 1, populate: { users: { email: true, }, }, }); if (messages.docs.length > 0) { // Update timestamp to latest message for next poll lastTimestamp = messages.docs[0].updatedAt; // Send messages to client in SSE format await writer.write( encoder.encode( `event: message\ndata: ${JSON.stringify(messages.docs)}\n\n` ) ); } } }; // Poll for new messages every second const messageInterval = setInterval(pollMessages, 1000); // Clean up intervals and close writer when connection is aborted signal.addEventListener("abort", () => { clearInterval(keepAlive); clearInterval(messageInterval); writer.close(); }); // Return SSE response with appropriate headers return new Response(stream.readable, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", "X-Accel-Buffering": "no", // Prevents nginx from buffering the response "Access-Control-Allow-Origin": "*", // CORS header for cross-origin requests "Access-Control-Allow-Methods": "GET, OPTIONS", "Access-Control-Allow-Headers": "Content-Type", }, }); } catch (error) { console.log(error); return new Response("Error occurred", { status: 500 }); } }, };
客户
客户端应用程序是使用`vite cli`创建的反应应用程序。
`endpoint` = `[payload server]/api/messages/sse`
我们连接到端点然后监听响应。
如何连接 SSE 的服务器
useEffect(() => { // Create EventSource connection const eventSource = new EventSource( `${import.meta.env.VITE_API_URL}${endpoint}` ); // Handle incoming messages eventSource.onmessage = (event) => { const data = JSON.parse(event.data); setMessages((prev) => [...prev, data]); }; // Handle connection open eventSource.onopen = () => { console.log("SSE Connection Established"); }; // Handle errors eventSource.onerror = (error) => { console.error("SSE Error:", error); eventSource.close(); }; // Cleanup on component unmount return () => { eventSource.close(); }; }, [endpoint]);