콘텐츠로 이동

플로우 상태와 헬퍼

패턴별 연결 방식은 직렬/병렬/조건/HTTP 페이지를 보고, 노드 내부 코드는 이 규칙을 기준으로 작성하세요.

필드의미주로 쓰는 곳
state["query"]원본 사용자 입력 (문자열)첫 노드 / 사용자 텍스트가 필요한 모든 노드
state["messages"]Human/AI/System 메시지 누적 (add_messages reducer)병렬 분기 자동 누적, 디버깅, 메모리 요약 SystemMessage 도 여기에 prepend 됨
state["output"]직전 노드가 남긴 결과 (문자열 또는 {output_text, output_data, output_files})다음 노드 입력, 최종 응답 조립
state["metadata"]요청 / 노드 메타데이터 dicta2a_data, a2a_files, is_stream_request, skillId

A2A 사용자·세션·챗봇 정보가 필요하면 ensure_request_context(state) 가 돌려주는 RequestContext 를 사용하세요 (ctx.user_id / ctx.session_id 등).

핵심 모델:

  • 다음 노드가 읽을 값은 output에 둡니다.
  • 사용자에게 보일 텍스트는 output_text, 코드가 읽을 값은 output_data, 파일은 output_files에 둡니다.
  • messages는 누적됩니다. 병렬 분기처럼 output 충돌이 나는 곳에서는 messagessource를 붙이고 merge에서 합칩니다.

output은 누적 merge가 아니라 최신 노드 결과로 갱신되는 슬롯입니다. 여러 단계에서 값을 계속 보존해야 하면 직접 carry-forward 하세요.

노드 반환 기본형
return {
"messages": [AIMessage(content=summary)],
"output": {
"output_text": summary,
"output_data": {"order_id": order_id, "status": "created"},
},
}
이전 output_data를 이어받아 새 output_data 만들기
async def enrich_order(state) -> dict:
data = extract_latest_output_data(state)
previous = data[0] if data else {}
enriched = {
**previous,
"status": "done",
}
return {
"output": {
"output_text": "주문 처리가 완료되었습니다.",
"output_data": enriched,
}
}
필드갱신 방식
messages누적
output최신 노드 결과로 갱신
metadata유지할 값은 직접 복사해서 확장

A2A 입력이 metadata에 들어오는 방식

섹션 제목: “A2A 입력이 metadata에 들어오는 방식”

클라이언트가 A2A message.partsDataPartFilePart를 보내면 SDK가 Flow 실행 전에 state["metadata"]에 넣습니다.

요청 parts 예시
[
{ "kind": "text", "text": "이 문서를 검토해줘" },
{
"kind": "data",
"data": { "schema": "review.v1", "documentId": "doc-123" }
},
{
"kind": "file",
"file": {
"uri": "https://storage.example.com/report.pdf",
"name": "report.pdf",
"mimeType": "application/pdf"
}
}
]

Flow 노드에서는 대략 아래 모양으로 보입니다.

state['metadata']
{
"primary_text": "이 문서를 검토해줘",
"supplemental_texts": [],
"a2a_data": [
{"schema": "review.v1", "documentId": "doc-123"},
],
"a2a_files": [
{
"name": "report.pdf",
"mime_type": "application/pdf",
"uri": "https://storage.example.com/report.pdf",
"bytes_b64": None,
},
],
}

보통 직접 state["metadata"]["a2a_data"]를 읽기보다 helper를 씁니다.

data_parts = extract_a2a_data(state)
file_parts = extract_a2a_files(state)

Flow의 최종 A2A 응답은 **END에 연결된 exit 노드의 output**에서 만들어집니다.

output 모양응답
"완료했습니다"TextPart
{"output_text": "...", "output_data": {...}}TextPart + DataPart
{"output_text": "...", "output_files": [...]}TextPart + FilePart
exit 노드에서 구조화 응답 반환
async def final_node(state) -> dict:
result = {"count": 3, "items": ["A", "B", "C"]}
return {
"output": {
"output_text": "3건을 찾았습니다.",
"output_data": result,
}
}

위 출력은 A2A message/send 응답에서 대략 아래처럼 보입니다. 실제 응답에는 task id, context id, timestamp 같은 필드가 더 포함될 수 있습니다.

JSON-RPC 2.0 / A2A 0.3.0 응답 예시
{
"jsonrpc": "2.0",
"id": "1",
"result": {
"status": { "state": "completed" },
"artifacts": [
{
"name": "structured-response",
"parts": [
{ "kind": "text", "text": "3건을 찾았습니다." },
{
"kind": "data",
"data": { "count": 3, "items": ["A", "B", "C"] }
}
]
}
]
}
}

Artifact 이름/설명은 보통 flow 템플릿의 main.py에서 ExtensionConfig로 관리합니다. app/config.py에 상수를 두고 main.py의 주석을 해제하는 패턴입니다.

app/config.py
ARTIFACT_NAME = "search-result"
ARTIFACT_DESCRIPTION = "검색 결과 응답"
main.py
from app.config import ARTIFACT_DESCRIPTION, ARTIFACT_NAME
app = await create_server(
card=build_card(settings),
extension=ExtensionConfig(
max_retry=settings.REACT_MAX_ITERATIONS,
artifact_name=ARTIFACT_NAME,
artifact_description=ARTIFACT_DESCRIPTION,
),
settings=settings,
agent=graph,
)

요청별로 artifact 이름이 달라져야 하는 특수한 경우에만 exit 노드 outputartifact_name / artifact_description을 넣어 default를 덮어씁니다.

import
from llamon_agent.graph import (
build_agent_context,
call_agent_auto,
extract_a2a_data,
extract_a2a_files,
extract_latest_output_data,
extract_latest_text,
extract_output_files,
extract_query,
resolve_file_bytes,
)

state 추출 — 노드에서 무엇을 받을지

섹션 제목: “state 추출 — 노드에서 무엇을 받을지”

extract_query(state) → str

사용자가 보낸 원본 텍스트만 필요할 때 (a2a_data / 이전 output 무시).

  • 검색 순서: messages[] 역순의 마지막 HumanMessagestate["query"]
  • 빈 문자열 가능 (절대 None 아님)

extract_latest_text(state) → str

직전 노드의 텍스트 응답을 이어쓸 때 — 직렬 플로우의 표준.

  • 검색 순서: state["output"]output_textmessages[] 역순의 AIMessage / HumanMessagestate["query"]

extract_latest_output_data(state) → list[dict]

직전 노드가 만든 구조화 JSON 데이터를 이어쓸 때.

  • 검색 순서: state["output"]["output_data"]messages[] 역순 메시지 content 안의 output_data
  • 빈 list 가능

세 함수 모두 messages 까지 거슬러 올라가는 fallback 이 있어, 직전 노드가 output 을 채우지 않아도 안전하게 동작합니다.

파일 — 입력 첨부 vs 노드 산출물

섹션 제목: “파일 — 입력 첨부 vs 노드 산출물”

extract_a2a_files(state) → list[dict]

최초 A2A 요청에 첨부된 파일 (사용자 업로드) 을 읽을 때.

  • state["metadata"]["a2a_files"] 만 봄. fallback 없음.

extract_output_files(state) → list[dict]

직전 노드가 만든 파일을 다음 노드에서 처리할 때.

  • state["output"]["output_files"] 만 봄.
  • extract_a2a_files 와 헷갈리지 말 것 — 사용자 첨부 가 아니라 노드 산출물

resolve_file_bytes(file_dict) → bytes

위 두 함수가 돌려준 dict 에서 실제 bytes 가 필요할 때 (OCR / hash / 업로드).

  • 시도 순서: bytes_b64data: URI → file:// URI → 로컬 path
  • 실패 시 ValueError / FileNotFoundError raise (None 반환 아님)
  • https:// URI 는 미지원 — 별도 helper 필요 (아래 URI 로 받은 PDF 참고)

extract_a2a_data(state) → list[dict]

최초 A2A 요청의 DataPart (구조화 JSON 입력) — 예: orderId, documentId, userId.

  • state["metadata"]["a2a_data"] 만 봄
  • dict 아닌 항목은 자동 필터링

build_agent_context(state) → str

다음 Registry agent 에 사용자 질문 + 이전 노드 결과 (upstream summary / previous text / output_data) 를 LLM 친화적 1개 프롬프트로 합쳐서 넘길 때.

  • upstream / previous / data 가 모두 비어있으면 effective_query 만 그대로 반환 (오버헤드 0)

call_agent_auto(url, query, *, state, …)

Registry agent 호출 — 스트리밍 여부는 state["metadata"]["is_stream_request"] 로 자동 판정.

  • 비스트리밍 → call_agent, 스트리밍 → call_agent_stream_result
  • 반환 타입: str 또는 dict

call_agent_auto 의 4가지 opt-in (composer 패턴 필수)

섹션 제목: “call_agent_auto 의 4가지 opt-in (composer 패턴 필수)”
output = await call_agent_auto(
agent_url, query, state=state,
forward_inbound_data=True, # 들어온 DataPart 를 sub-agent 에 그대로 전달
forward_inbound_files=True, # 들어온 FilePart 를 sub-agent 에 그대로 전달
forward_inbound_metadata=False, # 주의 — SDK 내부 키 (a2a_data 등) 까지 전달됨
raise_on_pending=True, # 미해결 sub-agent 면 sentinel 대신 즉시 에러
)
옵션기본켜야 하는 경우
forward_inbound_dataFalsecomposer 패턴 — 사용자가 보낸 JSON 입력을 sub-agent 가 그대로 받아야 할 때
forward_inbound_filesFalsecomposer 패턴 — 사용자가 보낸 파일을 sub-agent 가 그대로 받아야 할 때
forward_inbound_metadataFalse거의 안 켬. is_stream_request 등 SDK 내부 키도 함께 전달되어 sub-agent 가 잘못 해석할 위험
raise_on_pendingFalsesub-agent 가 registry 에 미등록이면 sentinel 대신 명시적 에러로 부팅·요청 차단

AgentState 의 필드별 갱신 방식이 다르므로 패턴에 맞춰 헬퍼를 선택합니다.

필드직렬 (A→B→C)병렬 (A,B→C)
messages자동 누적자동 누적 (add_messages reducer)
output다음 노드가 그대로 읽음충돌 — reducer 없음. merge 노드에서 합쳐서 한 번에 write 필요
metadata보통 변경 안 함merge 노드에서 의도적으로 합쳐야 함

직렬에서는 extract_latest_text / extract_latest_output_data 가 바로 앞 노드의 output 을 읽습니다. 병렬에서는 각 분기를 messagesadditional_kwargs={"source": ...} 로 구분해서 보내고, merge 노드에서 합쳐 output_data 로 통합합니다 (아래 병렬 merge 패턴 참고).

async def route_node(state) -> dict:
query = extract_query(state)
intent = "order" if "주문" in query else "general"
return {"output": {"output_text": query, "output_data": {"intent": intent}}}
async def business_logic(state) -> dict:
data = extract_latest_output_data(state)
order = data[0] if data else {}
summary = f"주문 상태는 {order.get('status', 'unknown')}입니다."
return {"output": {"output_text": summary, "output_data": order}}
async def use_request_data(state) -> dict:
inputs = extract_a2a_data(state)
payload = inputs[0] if inputs else {}
return {"output": {"output_text": "입력 데이터를 확인했습니다.", "output_data": payload}}
async def use_request_file(state) -> dict:
files = extract_a2a_files(state)
if not files:
return {"output": "첨부 파일이 없습니다."}
content = resolve_file_bytes(files[0])
return {"output": {"output_text": f"{len(content)} bytes 파일을 받았습니다."}}

resolve_file_bytes()bytes_b64, data: URI, file:// 또는 로컬 경로를 읽습니다. https://... URI까지 읽어야 하면 아래처럼 프로젝트 코드에 작은 helper를 추가하세요.

PDF 텍스트 추출이 필요하면 프로젝트에 pypdf를 추가합니다.

Terminal window
uv add pypdf
from io import BytesIO
import httpx
from pypdf import PdfReader
from llamon_agent import AIMessage, runtime_output_text
from llamon_agent.graph import (
call_agent_auto,
extract_a2a_files,
extract_latest_output_data,
extract_query,
resolve_file_bytes,
)
# 프로젝트에서 정의하는 helper입니다. SDK가 제공하는 함수는 `resolve_file_bytes()`입니다.
async def read_file_bytes(file: dict) -> bytes:
uri = file.get("uri")
if isinstance(uri, str) and uri.startswith(("http://", "https://")):
async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client:
response = await client.get(uri)
response.raise_for_status()
return response.content
return resolve_file_bytes(file)
def extract_pdf_text(pdf_bytes: bytes) -> str:
reader = PdfReader(BytesIO(pdf_bytes))
return "\n\n".join((page.extract_text() or "").strip() for page in reader.pages).strip()
async def read_pdf_node(state) -> dict:
query = extract_query(state)
files = extract_a2a_files(state)
if not files:
return {"output": "PDF 파일을 함께 첨부해주세요."}
pdf_bytes = await read_file_bytes(files[0])
document_text = extract_pdf_text(pdf_bytes)
if not document_text:
return {"output": "PDF에서 읽을 수 있는 텍스트를 찾지 못했습니다."}
return {
"output": {
"output_text": "PDF 내용을 읽었습니다.",
"output_data": {
"query": query,
"document_text": document_text,
"file_name": files[0].get("name"),
},
}
}
async def answer_from_pdf_node(state, *, agent_url: str) -> dict:
data = extract_latest_output_data(state)
payload = data[0] if data else {}
prompt = f"""문서 내용만 근거로 질문에 답변하세요.
질문:
{payload.get("query", "")}
문서:
{payload.get("document_text", "")}
"""
output = await call_agent_auto(agent_url, prompt, state=state)
return {"messages": [AIMessage(content=runtime_output_text(output))], "output": output}

DataPart/FilePart가 여러 개면 InputContracts로 shape를 선언하고 결정적으로 고르세요. 자세한 scoring 규칙은 입력 선별을 참고하세요. 아래는 DataPart 예시입니다. FilePart는 FileContractselect_input_file(state, contracts, name)을 사용합니다.

from llamon_agent import DataContract, DataFieldContract, InputContracts
from llamon_agent.graph import select_input_data
INPUT_CONTRACTS = InputContracts(data=[
DataContract(
name="order",
fields=[DataFieldContract(key="orderId", value_type="string")],
)
])
async def business_logic(state) -> dict:
order = select_input_data(state, INPUT_CONTRACTS, "order")
if order is None:
return {"output": "order 입력을 찾지 못했습니다."}
return {"output": {"output_text": "주문 입력을 선택했습니다.", "output_data": order}}

다음 에이전트에 넘길 프롬프트는 build_agent_context(state)를 기본으로 쓰면 됩니다. 이전 노드의 텍스트와 output_data가 함께 정리됩니다.

from llamon_agent import AIMessage, runtime_output_text
from llamon_agent.graph import build_agent_context, call_agent_auto
async def final_agent_node(state, *, agent_url: str) -> dict:
output = await call_agent_auto(agent_url, build_agent_context(state), state=state)
return {
"messages": [AIMessage(content=runtime_output_text(output))],
"output": output,
}

병렬 분기에서는 각 노드가 output을 동시에 쓰지 않게 하고, source 태그로 구분합니다.

from llamon_agent import AIMessage
async def branch_a(state) -> dict:
return {"messages": [AIMessage(content="A 결과", additional_kwargs={"source": "a"})]}
async def merge(state) -> dict:
results = {}
for msg in state.get("messages", []):
source = getattr(msg, "additional_kwargs", {}).get("source")
if source:
results[source] = str(msg.content)
return {"output": {"output_text": "병렬 결과를 합쳤습니다.", "output_data": results}}

State 안전 추출 — require_state_str / require_state_list

섹션 제목: “State 안전 추출 — require_state_str / require_state_list”

state["metadata"]["orderId"] 같은 키를 노드가 반드시 읽어야 하는 경우, isinstance 가드 + raise boilerplate 가 반복됩니다. require_state_str / require_state_list 는 type-safe 추출 + 명확한 LlamonApplicationError 를 한 줄로 처리하고 pyright/mypy strict 의 자동 narrowing 까지 보장합니다.

from llamon_agent.graph import require_state_str, require_state_list
async def my_node(state) -> dict:
metadata = state.get("metadata", {}) or {}
order_id = require_state_str(metadata, "orderId") # type: str (narrowed)
items = require_state_list(metadata, "items") # type: list (또는 [])
...

caller 가 reason 을 명시하지 않으면 snake_case ASCII 컨벤션을 따르는 자동 reason 사용 — field / expectedType / receivedType 는 trace event 보조 필드로 자동 노출:

상황errorReasonextra 보조 필드
부재 / 빈 문자열state_field_requiredfield, expectedType="str", receivedType="None"
타입 mismatchstate_field_invalid_typefield, expectedType, receivedType

도메인 의미가 필요하면 caller 가 직접 명시:

order_id = require_state_str(
metadata, "orderId",
reason="order_id_required", # snake_case ASCII 권장
message="주문 ID가 누락되었습니다",
)

AgentState 를 확장한 도메인-specific TypedDict 로 type 안전성 강화:

from typing import NotRequired
from llamon_agent.graph import AgentState, require_state_str
class AppState(AgentState):
"""프로젝트별 추가 필드. NotRequired 권장."""
orderId: NotRequired[str]
customerId: NotRequired[str]
async def ingest_node(state: AppState) -> dict:
metadata = state.get("metadata", {}) or {}
order_id = require_state_str(metadata, "orderId")
customer_id = require_state_str(metadata, "customerId")
...

scaffold 한 신규 프로젝트의 nodes.py 상단 주석에 같은 안내가 포함됩니다 — 필요 시 주석 해제 후 활용.