콘텐츠로 이동

플로우 공통 패턴

builder.node(name, func, node_kind="<kind>") 로 노드의 종류를 명시합니다. 트레이스 분류·라우팅 우선순위·에러 메시지 정확도가 달라지므로 새 코드에서는 아래 canonical kind만 사용하세요.

kind용도Studio UI
registry_nodeRegistry 에이전트를 A2A JSON-RPC 2.0으로 호출선택 가능
registry_llmRegistry의 LLM + 프롬프트 + 가드레일 설정으로 호출선택 가능
businessPython 코드로 자유롭게 데이터 가공·외부 연동선택 가능
merge병렬 분기 결과를 하나로 합침선택 가능
guardrail입출력 검증 (통과/차단 판정)선택 가능
http외부 API 호출 (httpx 비동기)선택 가능
postgresPostgreSQL 직접 연결선택 가능
transform구조화 데이터 추출·매핑·변환선택 가능
llm로컬 LLM Agent 인스턴스 호출코드 전용

전체 스펙: llamon_agent.application.node_kinds.NODE_KIND_REGISTRY.

builder.node("agent_a", _agent_a, node_kind="registry_node")
builder.node("business_logic", business_logic, node_kind="business")
builder.node("http_fetch", http_fetch, node_kind="http")
builder.node("merge_results", merge_results, node_kind="merge")

Registry 에이전트 노드는 URL을 미리 resolve한 뒤 클로저로 감싸서 등록합니다.

async def _agent_a(state):
return await agent_a_node(state, agent_a_url=url_a)
builder.node("agent_a", _agent_a, node_kind="registry_node")

직접 등록하면 keyword-only 인자 오류가 납니다.


  • message/stream이어도 중간 노드는 항상 sync
  • END에 연결된 exit 노드만 토큰 스트리밍
  • exit 노드 판정은 GraphBuilder.edge(src, END) 호출 시 자동 (_llamon_exit_nodes로 첨부)

exit 노드에서 call_agent_auto (A2A) 또는 call_llm (로컬 LLM) 을 쓰면 state 기반으로 스트리밍 자동 분기됩니다. 보일러플레이트 10줄 → 1줄.

from llamon_agent import AIMessage, runtime_output_text
from llamon_agent.graph import build_agent_context, call_agent_auto, call_llm
# A2A 외부 에이전트 exit 노드
async def agent_b_node(state, *, agent_b_url: str) -> dict:
output = await call_agent_auto(agent_b_url, build_agent_context(state), state=state)
return {"messages": [AIMessage(content=runtime_output_text(output))], "output": output}
# 로컬 LLM (registry_llm 노드 주입) exit 노드 — `prompt` 는 system_prompt + state context 로 직접 조립
async def summarize(state, *, summarize_agent) -> dict:
text = await call_llm(summarize_agent, prompt, state=state)
return {"messages": [AIMessage(content=text)], "output": {"output_text": text}}

헬퍼 동작 매트릭스:

조건경로
state.metadata.is_stream_request=True.stream() / call_agent_stream_result 경로 자동 선택
state 없음 / 플래그 False.invoke() / call_agent() (v0.1 동등)
.stream() 부재 (call_llm).invoke() 안전 폴백
LangGraph 컨텍스트 밖writer 없이 스트리밍 지속 (디버그 로깅)

토큰을 SSE로 누가 내보내는가 — 두 헬퍼의 자동 동작이 다른데, 토큰이 흐르는 경로가 다르기 때문입니다.

  • call_agent_auto (외부 A2A): 토큰이 프로세스 경계를 넘어오므로 저절로 전파되지 않습니다. 그래서 스트리밍 요청이면 get_stream_writer()자동으로 잡아 토큰을 직접 흘려보냅니다.
  • call_llm (로컬 LLM): 원시 LangChain LLM의 토큰은 이미 LangGraph messages 채널로 자연 전파됩니다. 여기에 writer까지 더하면 같은 토큰이 두 번 나가므로(이중 송출), 기본적으로 writer를 잡지 않습니다 — 단, 자체 스트림이 messages와 단절된 LangGraphAgent류 wrapper일 때만 자동으로 연결합니다.

토큰을 직접 받아 가공하거나 모아야 하면, 두 헬퍼 모두 on_text_chunk= 콜백을 넘기면 됩니다 (그러면 위 자동 규칙 대신 그 콜백이 쓰입니다).

forward_inbound_* / raise_on_pendingcall_agent_auto 의 호출 옵션은 플로우 상태와 헬퍼 → 자주 쓰는 헬퍼 참고.


공통:

  • 모든 노드는 async def
  • 다음 노드에 넘길 값이 있으면 output에 넣기
  • metadata를 넘길 때는 기존 값을 복사해서 확장
return {
"output": result,
"metadata": {**state.get("metadata", {}), "key": "value"},
}

병렬 노드:

  • 병렬 분기에서는 output 충돌 가능
  • messagesadditional_kwargs["source"]를 사용해 merge에서 합치기
  • merge 노드에는 node_kind="merge"를 명시

라우팅 함수:

  • 반환값은 conditional_edge의 키와 정확히 일치해야 함
  • 분기 함수 자체는 builder.node() 등록 대상이 아님

부팅 시점에 Registry 에 없는 ID 도 안전하게 선언할 수 있습니다. 서버는 정상 기동되고, 사용자 호출 시점에 SDK 가 1회 자동 재조회합니다.

위치동작
await registry.resolve_url(AGENT_ID)미등록이면 PendingAgentRef (str subclass) 반환
await call_agent_auto(url, ...) / call_agent(...)호출 시점에 lazy resolve. 등록되어 있으면 정상 호출, 아니면 빈 sentinel {"output_text": "", "output_data": {"_pending_agent": True, "_pending_agent_id": ...}} 반환
await mcp.bind_registry(settings, mcp_ids=[...])일부 MCP load 실패해도 raise 안 함 — mcp.failures() 에 기록
await mcp.call("tool_name", ...)tool 미존재 + pending 있을 때 자동 재조회, 새로 등록된 MCP 의 tool 즉시 사용 가능

재조회는 내부 TTL + asyncio.Lock 으로 직렬화 — Registry 폭증 차단. 명시적 제어가 필요하면 await mcp.try_recheck_pending() 또는 await pending_ref.resolve_now() 를 직접 호출.

call_agent_auto(..., raise_on_pending=True) 를 쓰면 빈 sentinel 대신 UPSTREAM_UNAVAILABLE 애플리케이션 에러로 즉시 실패시킬 수 있습니다. Composer처럼 inbound A2A data/files/metadata를 하위 에이전트로 넘겨야 하는 경우에는 forward_inbound_data=True, forward_inbound_files=True, forward_inbound_metadata=True 를 명시적으로 켭니다.


  • node_kind로 무엇을 줘야 할지 헷갈릴 때
  • graph.py에서 노드 등록 패턴이 헷갈릴 때
  • message/stream이 왜 마지막 노드에서만 보이는지 확인할 때
  • 병렬 노드에서 output 충돌을 피하고 싶을 때
  • 미등록 agent/MCP ID 의 사전 선언 가능 여부 확인할 때