플로우 공통 패턴
1) node_kind 선택
섹션 제목: “1) node_kind 선택”builder.node(name, func, node_kind="<kind>") 로 노드의 종류를 명시합니다. 트레이스 분류·라우팅 우선순위·에러 메시지 정확도가 달라지므로 새 코드에서는 아래 canonical kind만 사용하세요.
| kind | 용도 | Studio UI |
|---|---|---|
registry_node | Registry 에이전트를 A2A JSON-RPC 2.0으로 호출 | 선택 가능 |
registry_llm | Registry의 LLM + 프롬프트 + 가드레일 설정으로 호출 | 선택 가능 |
business | Python 코드로 자유롭게 데이터 가공·외부 연동 | 선택 가능 |
merge | 병렬 분기 결과를 하나로 합침 | 선택 가능 |
guardrail | 입출력 검증 (통과/차단 판정) | 선택 가능 |
http | 외부 API 호출 (httpx 비동기) | 선택 가능 |
postgres | PostgreSQL 직접 연결 | 선택 가능 |
transform | 구조화 데이터 추출·매핑·변환 | 선택 가능 |
llm | 로컬 LLM Agent 인스턴스 호출 | 코드 전용 |
전체 스펙: llamon_agent.application.node_kinds.NODE_KIND_REGISTRY.
builder.node("agent_a", _agent_a, node_kind="registry_node")builder.node("business_logic", business_logic, node_kind="business")builder.node("http_fetch", http_fetch, node_kind="http")builder.node("merge_results", merge_results, node_kind="merge")2) 클로저 패턴
섹션 제목: “2) 클로저 패턴”Registry 에이전트 노드는 URL을 미리 resolve한 뒤 클로저로 감싸서 등록합니다.
async def _agent_a(state): return await agent_a_node(state, agent_a_url=url_a)
builder.node("agent_a", _agent_a, node_kind="registry_node")직접 등록하면 keyword-only 인자 오류가 납니다.
3) 스트리밍 규칙
섹션 제목: “3) 스트리밍 규칙”message/stream이어도 중간 노드는 항상 sync- END에 연결된 exit 노드만 토큰 스트리밍
- exit 노드 판정은
GraphBuilder.edge(src, END)호출 시 자동 (_llamon_exit_nodes로 첨부)
권장: DX 헬퍼 사용 (v0.2.0+)
섹션 제목: “권장: DX 헬퍼 사용 (v0.2.0+)”exit 노드에서 call_agent_auto (A2A) 또는 call_llm (로컬 LLM) 을 쓰면 state 기반으로 스트리밍 자동 분기됩니다. 보일러플레이트 10줄 → 1줄.
from llamon_agent import AIMessage, runtime_output_textfrom llamon_agent.graph import build_agent_context, call_agent_auto, call_llm
# A2A 외부 에이전트 exit 노드async def agent_b_node(state, *, agent_b_url: str) -> dict: output = await call_agent_auto(agent_b_url, build_agent_context(state), state=state) return {"messages": [AIMessage(content=runtime_output_text(output))], "output": output}
# 로컬 LLM (registry_llm 노드 주입) exit 노드 — `prompt` 는 system_prompt + state context 로 직접 조립async def summarize(state, *, summarize_agent) -> dict: text = await call_llm(summarize_agent, prompt, state=state) return {"messages": [AIMessage(content=text)], "output": {"output_text": text}}헬퍼 동작 매트릭스:
| 조건 | 경로 |
|---|---|
state.metadata.is_stream_request=True | .stream() / call_agent_stream_result 경로 자동 선택 |
state 없음 / 플래그 False | .invoke() / call_agent() (v0.1 동등) |
.stream() 부재 (call_llm) | .invoke() 안전 폴백 |
| LangGraph 컨텍스트 밖 | writer 없이 스트리밍 지속 (디버그 로깅) |
토큰을 SSE로 누가 내보내는가 — 두 헬퍼의 자동 동작이 다른데, 토큰이 흐르는 경로가 다르기 때문입니다.
call_agent_auto(외부 A2A): 토큰이 프로세스 경계를 넘어오므로 저절로 전파되지 않습니다. 그래서 스트리밍 요청이면get_stream_writer()를 자동으로 잡아 토큰을 직접 흘려보냅니다.call_llm(로컬 LLM): 원시 LangChain LLM의 토큰은 이미 LangGraphmessages채널로 자연 전파됩니다. 여기에 writer까지 더하면 같은 토큰이 두 번 나가므로(이중 송출), 기본적으로 writer를 잡지 않습니다 — 단, 자체 스트림이messages와 단절된LangGraphAgent류 wrapper일 때만 자동으로 연결합니다.
토큰을 직접 받아 가공하거나 모아야 하면, 두 헬퍼 모두 on_text_chunk= 콜백을 넘기면 됩니다 (그러면 위 자동 규칙 대신 그 콜백이 쓰입니다).
forward_inbound_* / raise_on_pending 등 call_agent_auto 의 호출 옵션은 플로우 상태와 헬퍼 → 자주 쓰는 헬퍼 참고.
4) 노드 작성 규칙
섹션 제목: “4) 노드 작성 규칙”공통:
- 모든 노드는
async def - 다음 노드에 넘길 값이 있으면
output에 넣기 - metadata를 넘길 때는 기존 값을 복사해서 확장
return { "output": result, "metadata": {**state.get("metadata", {}), "key": "value"},}병렬 노드:
- 병렬 분기에서는
output충돌 가능 messages와additional_kwargs["source"]를 사용해 merge에서 합치기- merge 노드에는
node_kind="merge"를 명시
라우팅 함수:
- 반환값은
conditional_edge의 키와 정확히 일치해야 함 - 분기 함수 자체는
builder.node()등록 대상이 아님
5) 미등록 agent/MCP ID 사전 선언
섹션 제목: “5) 미등록 agent/MCP ID 사전 선언”부팅 시점에 Registry 에 없는 ID 도 안전하게 선언할 수 있습니다. 서버는 정상 기동되고, 사용자 호출 시점에 SDK 가 1회 자동 재조회합니다.
| 위치 | 동작 |
|---|---|
await registry.resolve_url(AGENT_ID) | 미등록이면 PendingAgentRef (str subclass) 반환 |
await call_agent_auto(url, ...) / call_agent(...) | 호출 시점에 lazy resolve. 등록되어 있으면 정상 호출, 아니면 빈 sentinel {"output_text": "", "output_data": {"_pending_agent": True, "_pending_agent_id": ...}} 반환 |
await mcp.bind_registry(settings, mcp_ids=[...]) | 일부 MCP load 실패해도 raise 안 함 — mcp.failures() 에 기록 |
await mcp.call("tool_name", ...) | tool 미존재 + pending 있을 때 자동 재조회, 새로 등록된 MCP 의 tool 즉시 사용 가능 |
재조회는 내부 TTL + asyncio.Lock 으로 직렬화 — Registry 폭증 차단.
명시적 제어가 필요하면 await mcp.try_recheck_pending() 또는
await pending_ref.resolve_now() 를 직접 호출.
call_agent_auto(..., raise_on_pending=True) 를 쓰면 빈 sentinel 대신 UPSTREAM_UNAVAILABLE 애플리케이션 에러로 즉시 실패시킬 수 있습니다. Composer처럼 inbound A2A data/files/metadata를 하위 에이전트로 넘겨야 하는 경우에는 forward_inbound_data=True, forward_inbound_files=True, forward_inbound_metadata=True 를 명시적으로 켭니다.
6) 언제 이 페이지를 보나
섹션 제목: “6) 언제 이 페이지를 보나”node_kind로 무엇을 줘야 할지 헷갈릴 때graph.py에서 노드 등록 패턴이 헷갈릴 때message/stream이 왜 마지막 노드에서만 보이는지 확인할 때- 병렬 노드에서
output충돌을 피하고 싶을 때 - 미등록 agent/MCP ID 의 사전 선언 가능 여부 확인할 때