Langgraph Human In The Loop 带插座

我们发现,人类可以通过langgraph的中断功能在代理执行过程中进行干预。

但如果你看一下这些例子,你会发现它们都忽略了人际交往。我实际上如何获得用户的确认?似乎有三种主要方法。

使用 Langgraph API 服务器

使用 langgraph cli 通过 docker 运行 langgraph API 服务器后,您可以运行图表、更改状态并使用 langgraph SDK 重新启动它。

您应该按照 langgraph 提供的方式使用它们。似乎有很多设置,可能很难与我的代码集成。

管理服务器上的图表

这是一种将上面的 Langgraph API 服务器中的必要部分仅实现到我的自定义服务器中的方法。例如,在运行图形时,必须保存运行该图形的客户端和图形检查点,并在用户确认后重新加载图形并根据用户的响应改变状态然后重新启动。

可能有很多事情需要处理。

套接字连接

当运行一个代理的时候,它会连接一个socket,并通过socket和用户进行交互。它的工作原理是简单地将通过套接字连接和套接字通信接收用户确认的步骤添加到现有的示例代码中。

相反,实现像打字一样的流式传输可能会很棘手。

通过套接字连接实现

首先,我想以一种最小化复杂性的方式实现它,所以我使用套接字连接来实现它。

服务端使用NestJs,客户端使用NextJs。

服务器

首先,创建一个用于 Websocket 连接的网关。我在 agent/start 创建了一个连接并立即启动了代理。

@WebSocketGateway({
  namespace: "/",
  transport: ["websocket", "polling"],
  path: "/agent/start",
  cors: {
    origin: "*",
    methods: ["GET", "POST"],
    credentials: true,
  },
})
export class AgentGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
  protected readonly logger = new Logger(this.constructor.name);

  constructor(
    private readonly agentFactory: AgentFactory
  ) {}

  private pendingConfirmations = new Map void>();

  // Handle new connections
  handleConnection(client: Socket) {
    console.log(`Client connected: ${client.id}`);

    // Option 1: Get actionData from query parameters
    const actionData: { agent: AgentName } = client.handshake.query.actionData
      ? JSON.parse(client.handshake.query.actionData as string)
      : null;

    if (actionData) {
      this.startAgentProcess(client, actionData);
    } else {
      // If no actionData is provided, you can wait for an event
      client.emit("error", "No action data provided");
      client.disconnect();
    }
  }

  // Handle disconnections
  handleDisconnect(client: Socket) {
    console.log(`Client disconnected: ${client.id}`);
    this.pendingConfirmations.delete(client.id);
  }

  // Send confirmation request
  async sendConfirmationRequest(clientId: string, data: any): Promise {
    return new Promise((resolve) => {
      this.pendingConfirmations.set(clientId, resolve);
      this.server.to(clientId).emit("confirmation_request", data);

      // Optional timeout for response
      setTimeout(() => {
        if (this.pendingConfirmations.has(clientId)) {
          this.pendingConfirmations.delete(clientId);
          resolve(false); // Default to 'false' if timeout occurs
        }
      }, 3000000); // 3000 seconds timeout
    });
  }

  // Handle client's confirmation response
  @SubscribeMessage("confirmation_response")
  handleClientResponse(
    @MessageBody() data: { confirmed: boolean },
    @ConnectedSocket() client: Socket
  ) {
    const resolve = this.pendingConfirmations.get(client.id);
    if (resolve) {
      resolve(data.confirmed);
      this.pendingConfirmations.delete(client.id);
    }
  }

  // Start the agent process
  private async startAgentProcess(
    client: Socket,
    actionData: { agent: AgentName }
  ) {
    const graph = await this.agentFactory.create({
      agentName: actionData.agent,
    });

    const initialInput = { input: "hello world" };

    // Thread
    const graphStateConfig = {
      configurable: { thread_id: "1" },
      streamMode: "values" as const,
    };

    // Run the graph until the first interruption
    for await (const event of await graph.stream(
      initialInput,
      graphStateConfig
    )) {
      this.logAndEmit(client, `--- ${event.input} ---`);
    }

    // Will log when the graph is interrupted, after step 2.
    this.logAndEmit(client, "---GRAPH INTERRUPTED---");

    const userConfirmed = await this.sendConfirmationRequest(client.id, {
      message: "Do you want to proceed with this action?",
      actionData,
    });

    if (userConfirmed) {
      // If approved, continue the graph execution. We must pass `null` as
      // the input here, or the graph will
      for await (const event of await graph.stream(null, graphStateConfig)) {
        this.logAndEmit(client, `--- ${event.input} ---`);
      }
      this.logAndEmit(client, "---ACTION EXECUTED---");
    } else {
      this.logAndEmit(client, "---ACTION CANCELLED---");
    }

    // Optionally disconnect the client
    client.disconnect();
  }

  private logAndEmit(client: Socket, message: string) {
    console.log(message);
    client.emit("message", { message });
  }
}

要点很简单。当套接字连接时,会立即创建并执行一个代理。当执行中断时,会向客户端发送确认请求消息并等待。一旦确认解决,图表将继续。

上述代码中使用的代理是依次使用langgraph文档中步骤1、2、3的代理。

const GraphState = Annotation.Root({
    input: Annotation,
  });

  const step1 = (state: typeof GraphState.State) => {
    console.log("---Step 1---");
    return state;
  };

  const step2 = (state: typeof GraphState.State) => {
    console.log("---Step 2---");
    return state;
  };

  const step3 = (state: typeof GraphState.State) => {
    console.log("---Step 3---");
    return state;
  };

  const builder = new StateGraph(GraphState)
    .addNode("step1", step1)
    .addNode("step2", step2)
    .addNode("step3", step3)
    .addEdge(START, "step1")
    .addEdge("step1", "step2")
    .addEdge("step2", "step3")
    .addEdge("step3", END);

  // Set up memory
  const graphStateMemory = new MemorySaver();

  const graph = builder.compile({
    checkpointer: graphStateMemory,
    interruptBefore: ["step3"],
  });
  return graph;

客户

客户端创建一个钩子来管理代理的启动及其状态。

import { useRef, useState } from "react";
import io, { Socket } from "socket.io-client";

export const useAgentSocket = () => {
  const socketRef = useRef(null);
  const [confirmationRequest, setConfirmationRequest] = useState(null);
  const [messages, setMessages] = useState([]);

  const connectAndRun = (actionData: any) => {
    return new Promise((resolve, reject) => {
      socketRef.current = io("http://localhost:8000", {
        path: "/agent/start",
        transports: ["websocket", "polling"],
        query: {
          actionData: JSON.stringify(actionData),
        },
      });

      socketRef.current.on("connect", () => {
        console.log("Connected:", socketRef.current?.id);
        resolve(void 0);
      });

      socketRef.current.on("connect_error", (error) => {
        console.error("Connection error:", error);
        reject(error);
      });

      // Listen for confirmation requests
      socketRef.current.on("confirmation_request", (data) => {
        setConfirmationRequest(data);
      });

      // Listen for messages
      socketRef.current.on("message", (data) => {
        console.log("Received message:", data);
        setMessages((prevMessages) => [...prevMessages, data.message]);
      });

      socketRef.current.on("disconnect", () => {
        console.log("Disconnected from server");
      });
    });
  };

  const sendConfirmationResponse = (confirmed: boolean) => {
    if (socketRef.current) {
      socketRef.current.emit("confirmation_response", { confirmed });
      setConfirmationRequest(null);
    }
  };

  const disconnectSocket = () => {
    if (socketRef.current) {
      socketRef.current.disconnect();
    }
  };

  const clearMessages = () => {
    setMessages([]);
  };

  return {
    confirmationRequest,
    sendConfirmationResponse,
    connectAndRun,
    disconnectSocket,
    messages,
    clearMessages,
  };
};

当确认请求到达时建立连接并更新确认请求状态。您可以在 UI 组件中检查确认请求状态并向用户显示一个窗口。