使用 Payload CMS 构建简单的实时聊天应用程序
有效载荷聊天
Payload 是开源的全栈 Next.js 框架,可为您提供即时后端超能力。立即获得完整的 TypeScript 后端和管理面板。使用 Payload 作为无头 CMS 或构建强大的应用程序。
使用(服务器发送事件)SSE 在后端使用自定义端点负载将更新发送到客户端,客户端使用 EventSource API 监听更新。
这只是一个练习,看看当使用 Payload 作为构建应用程序的平台(而不仅仅是 CMS)时可以做哪些有趣的事情
视频
此代码与下面的视频配套,本博文末尾有完整源代码的链接
项目结构
解决方案的主要组成部分
服务器:消息收集设置
我不会介绍设置有效载荷应用程序的具体细节,这些内容已在文档中介绍。但要开始操作,请运行以下命令,并在系统提示您输入数据库时选择“sqlite”。
npx create-payload-app@latest -t blank
创建一个名为“Messages.ts”的新文件并将其添加到有效负载项目文件夹“src/collections”。
消息集合
export const Messages: CollectionConfig = {
slug: "messages",
// not focused on access in this example
access: {
create: () => true,
read: () => true,
update: () => true,
delete: () => true,
},
endpoints: [SSEMessages], // <-- ADDED CUSTOM ENDPOINT api/messages/sse
fields: [
{
name: "sender",
type: "relationship",
relationTo: "users",
required: true,
},
{
name: "receiver",
type: "relationship",
relationTo: "users",
required: true,
},
{
name: "content",
type: "text",
required: true,
},
{
name: "timestamp",
type: "date",
required: true,
defaultValue: () => new Date().toISOString(),
},
],
};打开文件“src/payload-config.ts”添加一个新的导入。
import { Messages } from './collections/Messages'在同一文件的 `buildConfig` 部分中,有一个名为 `collections` 的属性,添加 `Messages`
collections: [Users, Media, Messages],
最后,让我们设置“cors”属性,这样我们就可以确保我们的 vite 应用可以访问 Payload 服务器
cors: ['*', 'http://localhost:3000', 'http://localhost:5173'],
服务器:SSE 的自定义端点已添加到集合
客户端访问下面的代码来从服务器获取更新流。
除了函数“pollMessages”之外,大部分代码都是为连接设置的,该函数根据以前的时间戳查询消息集合中的更新消息,并将它们作为有效负载发送到侦听器连接。
此代码放入文件“src/endpoints/SSEMessages.ts”
import type { Endpoint } from "payload";
/**
* Server-Sent Events (SSE) endpoint for Messages collection using TransformStream
* Implements a polling mechanism to check for new messages and stream them to clients
*/
export const SSEMessages: Endpoint = {
path: "/sse",
method: "get",
handler: async (req) => {
try {
// Create abort controller to handle connection termination
const abortController = new AbortController();
const { signal } = abortController;
// Set up streaming infrastructure
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const encoder = new TextEncoder();
// Initialize timestamp to fetch all messages from the beginning
let lastTimestamp = new Date(0).toISOString();
// Send keep-alive messages every 30 seconds to maintain connection
const keepAlive = setInterval(async () => {
if (!signal.aborted) {
await writer.write(
encoder.encode("event: ping\ndata: keep-alive\n\n")
);
}
}, 30000);
/**
* Polls for new messages and sends them to connected clients
* - Queries messages newer than the last received message
* - Updates lastTimestamp to the newest message's timestamp
* - Streams messages to client using SSE format
*/
const pollMessages = async () => {
if (!signal.aborted) {
// Query for new messages since last update
const messages = await req.payload.find({
collection: "messages",
where: {
updatedAt: { greater_than: lastTimestamp },
},
sort: "-updatedAt",
limit: 10,
depth: 1,
populate: {
users: {
email: true,
},
},
});
if (messages.docs.length > 0) {
// Update timestamp to latest message for next poll
lastTimestamp = messages.docs[0].updatedAt;
// Send messages to client in SSE format
await writer.write(
encoder.encode(
`event: message\ndata: ${JSON.stringify(messages.docs)}\n\n`
)
);
}
}
};
// Poll for new messages every second
const messageInterval = setInterval(pollMessages, 1000);
// Clean up intervals and close writer when connection is aborted
signal.addEventListener("abort", () => {
clearInterval(keepAlive);
clearInterval(messageInterval);
writer.close();
});
// Return SSE response with appropriate headers
return new Response(stream.readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no", // Prevents nginx from buffering the response
"Access-Control-Allow-Origin": "*", // CORS header for cross-origin requests
"Access-Control-Allow-Methods": "GET, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
},
});
} catch (error) {
console.log(error);
return new Response("Error occurred", { status: 500 });
}
},
};客户
客户端应用程序是使用`vite cli`创建的反应应用程序。
`endpoint` = `[payload server]/api/messages/sse`
我们连接到端点然后监听响应。
如何连接 SSE 的服务器
useEffect(() => {
// Create EventSource connection
const eventSource = new EventSource(
`${import.meta.env.VITE_API_URL}${endpoint}`
);
// Handle incoming messages
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setMessages((prev) => [...prev, data]);
};
// Handle connection open
eventSource.onopen = () => {
console.log("SSE Connection Established");
};
// Handle errors
eventSource.onerror = (error) => {
console.error("SSE Error:", error);
eventSource.close();
};
// Cleanup on component unmount
return () => {
eventSource.close();
};
}, [endpoint]);