使用 KubeMQ 增强 GenAI 应用程序:高效扩展检索增强生成 (RAG)
随着生成式人工智能 (GenAI) 在各行各业的采用不断激增,组织越来越多地利用检索增强生成 (RAG) 技术,通过实时、上下文丰富的数据来增强其人工智能模型。管理此类应用程序中的复杂信息流带来了重大挑战,尤其是在处理大规模连续生成的数据时。KubeMQ 是一个强大的消息代理,它作为一种解决方案出现,可以简化多个 RAG 流程的路由,确保在 GenAI 应用程序中高效处理数据。
为了进一步提高 RAG 工作流的效率和可扩展性,集成 FalkorDB 等高性能数据库至关重要。FalkorDB 为 RAG 系统所依赖的动态知识库提供了可靠且可扩展的存储解决方案,确保快速数据检索并与 KubeMQ 等消息传递系统无缝集成。
了解 GenAI 工作流中的 RAG
RAG 是一种通过集成检索机制来增强生成式 AI 模型的范式,允许模型在推理过程中访问外部知识库。这种方法通过基于最新和最相关的可用信息来显著提高生成的响应的准确性、相关性和及时性。
在使用 RAG 的典型 GenAI 工作流程中,该过程涉及多个步骤:
扩展这些步骤,特别是在数据不断生成和更新的环境中,需要在 RAG 管道的各个组件之间建立高效可靠的数据流机制。
KubeMQ 在 RAG 处理中的关键作用
大规模处理连续数据流
在物联网、社交媒体平台或实时分析系统等场景中,新数据不断产生,AI 模型必须快速适应以整合这些信息。传统的请求-响应架构在高吞吐量条件下可能成为瓶颈,导致延迟问题和性能下降。
KubeMQ 通过提供可扩展且强大的基础架构来管理高吞吐量消息传递场景,以实现服务之间的高效数据路由。通过将 KubeMQ 集成到 RAG 管道中,每个新数据点都会发布到消息队列或流中,确保检索组件可以立即访问最新信息,而不会使系统不堪重负。这种实时数据处理能力对于保持 GenAI 输出的相关性和准确性至关重要。
充当最佳路由器
KubeMQ 提供多种消息传递模式,包括队列、流、发布-订阅 (pub/sub) 和远程过程调用 (RPC),使其成为 RAG 管道内多功能且功能强大的路由器。其低延迟和高性能特性可确保及时传递消息,这对于实时 GenAI 应用程序至关重要,因为延迟会严重影响用户体验和系统效率。
此外,KubeMQ 处理复杂路由逻辑的能力允许制定复杂的数据分发策略。这确保了 AI 系统的不同组件能够在需要时准确地接收所需的数据,而不会出现不必要的重复或延迟。
集成FalkorDB以增强数据管理
虽然 KubeMQ 可以有效地在服务之间路由消息,但 **FalkorDB** 通过提供可扩展且高性能的图形数据库解决方案来存储和检索 RAG 流程所需的大量数据,从而对其进行了补充。这种集成可确保当新数据流经 KubeMQ 时,它会无缝地存储在 FalkorDB 中,从而可以随时进行检索操作,而不会引入延迟或瓶颈。
增强可扩展性和可靠性
随着 GenAI 应用程序的用户群和数据量不断增长,可扩展性成为首要问题。KubeMQ 可扩展,支持水平扩展以无缝适应增加的负载。它确保随着 RAG 进程数量的增加或数据生成的加速,消息传递基础架构保持稳健且响应迅速。
此外,KubeMQ 还提供消息持久性和容错功能。如果发生系统故障或网络中断,KubeMQ 可确保消息不会丢失,并且系统可以正常恢复。这种可靠性对于维护用户依赖的 AI 应用程序的完整性至关重要,以便及时准确地获取信息。
无需专用路由服务
为 RAG 管道中的数据处理实现自定义路由服务可能非常耗费资源且非常复杂。构建、维护和扩展这些服务通常需要大量的开发工作,从而分散对核心 AI 应用程序开发的关注。
通过采用 KubeMQ,组织无需创建定制的路由解决方案。KubeMQ 提供开箱即用的功能,可满足 RAG 流程的路由需求,包括复杂的路由模式、消息过滤和优先级处理。这不仅可以减少开发和维护开销,还可以加快 GenAI 解决方案的上市时间。
通过 REST 和 SDK 进行统一访问
KubeMQ 提供了多个接口用于与其消息代理功能进行交互:
这种灵活性使开发人员能够根据其特定用例选择最合适的方法,从而简化架构并加快开发周期。数据路由的单一接触点简化了 RAG 管道不同组件之间的通信,增强了整体系统的一致性。
在 RAG 管道中实现 KubeMQ:详细示例
该代码示例展示了如何通过将 KubeMQ 集成到 RAG 管道来构建电影信息检索系统。它设置了一个服务器,该服务器从烂番茄中提取电影 URL,以使用 GPT-4 构建知识图谱。用户可以通过聊天客户端与该系统交互,发送与电影相关的查询并接收 AI 生成的响应。此用例演示了如何在实际应用中处理持续数据提取和实时查询处理,利用 KubeMQ 在电影环境中进行高效的消息处理和服务间通信。
架构概述
设置KubeMQ
确保 KubeMQ 可以正常运行,可以通过使用 Docker 进行部署来实现:
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
此命令启动 KubeMQ,并为 REST 和 gRPC 通信公开必要的端口。
RAG 服务器端
此代码(GitHub repo)实现了一个 RAG 服务器,该服务器处理聊天查询并使用 KubeMQ 管理知识源以进行消息处理。
# server.py
import json
import threading
from typing import List
from dotenv import load_dotenv
load_dotenv()
import time
from kubemq.common import CancellationToken
from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription
from kubemq.queues import Client as QueuesClient
from graphrag_sdk.models.openai import OpenAiGenerativeModel
from graphrag_sdk.model_config import KnowledgeGraphModelConfig
from graphrag_sdk import KnowledgeGraph, Ontology
from graphrag_sdk.source import URL
class RAGServer:
def __init__(self):
self.cq_client = CQClient(address="localhost:50000")
self.queues_client = QueuesClient(address="localhost:50000")
model = OpenAiGenerativeModel(model_name="gpt-4o")
with open("ontology.json", "r") as f:
ontology = json.load(f)
ontology = Ontology.from_json(ontology)
self.kg = KnowledgeGraph(
name="movies",
model_config=KnowledgeGraphModelConfig.with_model(model),
ontology=ontology)
self.chat = self.kg.chat_session()
self.shutdown_event = threading.Event()
self.threads: List[threading.Thread] = []
def handle_chat(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
print(f"Received chat message: {message}")
result= self.chat.send_message(message)
answer = result.get("response","No answer")
print(f"Chat response: {answer}")
response = QueryResponseMessage(
query_received=request,
is_executed=True,
body=answer.encode('utf-8')
)
self.cq_client.send_response_message(response)
except Exception as e:
print(f"Error processing chat message: {str(e)}")
self.cq_client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def pull_from_queue(self):
while not self.shutdown_event.is_set():
try:
result = self.queues_client.pull("rag-sources-queue", 10, 1)
if result.is_error:
print(f"Error pulling message from queue: {result.error}")
continue
sources = []
for message in result.messages:
source = message.body.decode('utf-8')
print(f"Received source: {source}, adding to knowledge graph")
sources.append(URL(message.body.decode('utf-8')))
if sources:
self.kg.process_sources(sources)
except Exception as e:
if not self.shutdown_event.is_set(): # Only log if not shutting down
print(f"Error processing sources: {str(e)}")
def subscribe_to_chat_queries(self):
def on_error(err: str):
if not self.shutdown_event.is_set(): # Only log if not shutting down
print(f"Error: {err}")
cancellation_token = CancellationToken()
try:
self.cq_client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="rag-chat-query",
on_receive_query_callback=self.handle_chat,
on_error_callback=on_error,
),
cancel=cancellation_token
)
# Wait for shutdown signal
while not self.shutdown_event.is_set():
time.sleep(0.1)
# Cancel subscription when shutdown is requested
cancellation_token.cancel()
except Exception as e:
if not self.shutdown_event.is_set():
print(f"Error in subscription thread: {str(e)}")
def run(self):
chat_thread = threading.Thread(target=self.subscribe_to_chat_queries)
queue_thread = threading.Thread(target=self.pull_from_queue)
self.threads.extend([chat_thread, queue_thread])
for thread in self.threads:
thread.daemon = True # Make threads daemon so they exit when main thread exits
thread.start()
print("RAG server started")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down gracefully...")
self.shutdown()
self.cq_client.close()
self.queues_client.close()
def shutdown(self):
print("Initiating shutdown sequence...")
self.shutdown_event.set() # Signal all threads to stop
for thread in self.threads:
thread.join(timeout=5.0) # Wait up to 5 seconds for each thread
if thread.is_alive():
print(f"Warning: Thread {thread.name} did not shutdown cleanly")
print("Shutdown complete")
if __name__ == "__main__":
rag_server = RAGServer()
rag_server.run()服务器运行两个主线程:一个通过名为“rag-chat-query”的频道订阅聊天查询,并使用带有 GPT-4 的知识图谱处理它们,另一个从名为“rag-sources-queue”的队列中不断提取,以将新源添加到知识图谱中。知识图谱使用从 JSON 文件加载的自定义本体进行初始化,并使用 OpenAI 的 GPT-4 模型进行处理。服务器实现了正常关闭处理和错误管理,确保服务器停止时所有线程都正确终止。
将源数据发送到 RAG 知识图谱中
# sources_client.py
from kubemq.queues import *
class SourceClient:
def __init__(self, address="localhost:50000"):
self.client = Client(address=address)
def send_source(self, message: str) :
send_result = self.client.send_queues_message(
QueueMessage(
channel="rag-sources-queue",
body=message.encode("utf-8"),
)
)
if send_result.is_error:
print(f"message send error, error:{send_result.error}")
if __name__ == "__main__":
client = SourceClient()
urls = ["https://www.rottentomatoes.com/m/side_by_side_2012",
"https://www.rottentomatoes.com/m/matrix",
"https://www.rottentomatoes.com/m/matrix_revolutions",
"https://www.rottentomatoes.com/m/matrix_reloaded",
"https://www.rottentomatoes.com/m/speed_1994",
"https://www.rottentomatoes.com/m/john_wick_chapter_4"]
for url in urls:
client.send_source(url)
print("done")此代码实现了一个简单的客户端,该客户端通过 KubeMQ 的队列系统将电影 URL 发送到 RAG 服务器。具体来说,它创建一个 `SourceClient` 类,该类连接到 KubeMQ 并将消息发送到“rag-sources-queue”通道,该通道与 RAG 服务器监控的队列相同。当作为主程序运行时,它会发送烂番茄电影 URL 列表(包括《黑客帝国》电影、《疾速追杀》和《生死时速》),以供 RAG 服务器处理并添加到知识图谱中。
发送和接收问题和答案
#chat_client.py
from kubemq.cq import *
class ChatClient:
def __init__(self, address="localhost:50000"):
self.client = Client(address=address)
def send_message(self, message: str) -> str:
response = self.client.send_query_request(QueryMessage(
channel="rag-chat-query",
body=message.encode('utf-8'),
timeout_in_seconds=30
))
return response.body.decode('utf-8')
if __name__ == "__main__":
client = ChatClient()
print("Sending first question: Who is the director of the movie The Matrix?")
response = client.send_message("Who is the director of the movie The Matrix?")
print(f"Response: {response}")
print("Sending second question: How this director connected to Keanu Reeves?")
response = client.send_message("How this director connected to Keanu Reeves?")
print(f"Response: {response}")此代码实现了一个聊天客户端,该客户端通过 KubeMQ 的查询系统与 RAG 服务器进行通信。`ChatClient` 类将消息发送到“rag-chat-query”通道并等待响应,每个查询的超时时间为 30 秒。当作为主程序运行时,它通过发送两个有关《黑客帝国》导演及其与基努·里维斯的关系的相关问题来演示客户端的功能,并在收到每个响应时打印它们。
代码存储库
所有代码示例都可以在我对原始 GitHub 存储库的 fork 中找到。
结论
将 KubeMQ 集成到 GenAI 应用程序的 RAG 管道中,提供了一种可扩展、可靠且高效的机制来处理连续数据流和复杂的进程间通信。通过充当具有多种消息传递模式的统一路由器,KubeMQ 简化了整体架构,减少了对自定义路由解决方案的需求,并加快了开发周期。
此外,通过提供与 KubeMQ 无缝集成的高性能知识库,整合 FalkorDB 可增强数据管理。这种组合可确保优化数据检索和存储,支持 RAG 流程的动态需求。
处理高吞吐量场景的能力,结合持久性和容错性等特性,确保 GenAI 应用程序即使在高负载或系统中断的情况下也能保持响应和可靠。
通过利用 KubeMQ 和 FalkorDB,组织可以专注于增强他们的 AI 模型并提供有价值的见解和服务,并相信他们的数据路由基础设施是强大的并且能够满足现代 AI 工作流程的需求。