플로우 상태와 헬퍼
패턴별 연결 방식은 직렬/병렬/조건/HTTP 페이지를 보고, 노드 내부 코드는 이 규칙을 기준으로 작성하세요.
State 한눈에 보기
섹션 제목: “State 한눈에 보기”| 필드 | 의미 | 주로 쓰는 곳 |
|---|---|---|
state["query"] | 원본 사용자 입력 (문자열) | 첫 노드 / 사용자 텍스트가 필요한 모든 노드 |
state["messages"] | Human/AI/System 메시지 누적 (add_messages reducer) | 병렬 분기 자동 누적, 디버깅, 메모리 요약 SystemMessage 도 여기에 prepend 됨 |
state["output"] | 직전 노드가 남긴 결과 (문자열 또는 {output_text, output_data, output_files}) | 다음 노드 입력, 최종 응답 조립 |
state["metadata"] | 요청 / 노드 메타데이터 dict | a2a_data, a2a_files, is_stream_request, skillId 등 |
A2A 사용자·세션·챗봇 정보가 필요하면 ensure_request_context(state) 가 돌려주는 RequestContext 를 사용하세요 (ctx.user_id / ctx.session_id 등).
핵심 모델:
- 다음 노드가 읽을 값은
output에 둡니다. - 사용자에게 보일 텍스트는
output_text, 코드가 읽을 값은output_data, 파일은output_files에 둡니다. messages는 누적됩니다. 병렬 분기처럼output충돌이 나는 곳에서는messages에source를 붙이고 merge에서 합칩니다.
output은 누적 merge가 아니라 최신 노드 결과로 갱신되는 슬롯입니다. 여러 단계에서 값을 계속 보존해야 하면 직접 carry-forward 하세요.
return { "messages": [AIMessage(content=summary)], "output": { "output_text": summary, "output_data": {"order_id": order_id, "status": "created"}, },}async def enrich_order(state) -> dict: data = extract_latest_output_data(state) previous = data[0] if data else {}
enriched = { **previous, "status": "done", }
return { "output": { "output_text": "주문 처리가 완료되었습니다.", "output_data": enriched, } }| 필드 | 갱신 방식 |
|---|---|
messages | 누적 |
output | 최신 노드 결과로 갱신 |
metadata | 유지할 값은 직접 복사해서 확장 |
A2A 입력이 metadata에 들어오는 방식
섹션 제목: “A2A 입력이 metadata에 들어오는 방식”클라이언트가 A2A message.parts에 DataPart나 FilePart를 보내면 SDK가 Flow 실행 전에 state["metadata"]에 넣습니다.
[ { "kind": "text", "text": "이 문서를 검토해줘" }, { "kind": "data", "data": { "schema": "review.v1", "documentId": "doc-123" } }, { "kind": "file", "file": { "uri": "https://storage.example.com/report.pdf", "name": "report.pdf", "mimeType": "application/pdf" } }]Flow 노드에서는 대략 아래 모양으로 보입니다.
{ "primary_text": "이 문서를 검토해줘", "supplemental_texts": [], "a2a_data": [ {"schema": "review.v1", "documentId": "doc-123"}, ], "a2a_files": [ { "name": "report.pdf", "mime_type": "application/pdf", "uri": "https://storage.example.com/report.pdf", "bytes_b64": None, }, ],}보통 직접 state["metadata"]["a2a_data"]를 읽기보다 helper를 씁니다.
data_parts = extract_a2a_data(state)file_parts = extract_a2a_files(state)최종 응답 규칙
섹션 제목: “최종 응답 규칙”Flow의 최종 A2A 응답은 **END에 연결된 exit 노드의 output**에서 만들어집니다.
output 모양 | 응답 |
|---|---|
"완료했습니다" | TextPart |
{"output_text": "...", "output_data": {...}} | TextPart + DataPart |
{"output_text": "...", "output_files": [...]} | TextPart + FilePart |
async def final_node(state) -> dict: result = {"count": 3, "items": ["A", "B", "C"]} return { "output": { "output_text": "3건을 찾았습니다.", "output_data": result, } }위 출력은 A2A message/send 응답에서 대략 아래처럼 보입니다. 실제 응답에는 task id, context id, timestamp 같은 필드가 더 포함될 수 있습니다.
{ "jsonrpc": "2.0", "id": "1", "result": { "status": { "state": "completed" }, "artifacts": [ { "name": "structured-response", "parts": [ { "kind": "text", "text": "3건을 찾았습니다." }, { "kind": "data", "data": { "count": 3, "items": ["A", "B", "C"] } } ] } ] }}Artifact 이름/설명은 보통 flow 템플릿의 main.py에서 ExtensionConfig로 관리합니다.
app/config.py에 상수를 두고 main.py의 주석을 해제하는 패턴입니다.
ARTIFACT_NAME = "search-result"ARTIFACT_DESCRIPTION = "검색 결과 응답"from app.config import ARTIFACT_DESCRIPTION, ARTIFACT_NAME
app = await create_server( card=build_card(settings), extension=ExtensionConfig( max_retry=settings.REACT_MAX_ITERATIONS, artifact_name=ARTIFACT_NAME, artifact_description=ARTIFACT_DESCRIPTION, ), settings=settings, agent=graph,)요청별로 artifact 이름이 달라져야 하는 특수한 경우에만 exit 노드 output에 artifact_name / artifact_description을 넣어 default를 덮어씁니다.
자주 쓰는 헬퍼
섹션 제목: “자주 쓰는 헬퍼”from llamon_agent.graph import ( build_agent_context, call_agent_auto, extract_a2a_data, extract_a2a_files, extract_latest_output_data, extract_latest_text, extract_output_files, extract_query, resolve_file_bytes,)state 추출 — 노드에서 무엇을 받을지
섹션 제목: “state 추출 — 노드에서 무엇을 받을지”extract_query(state) → str
사용자가 보낸 원본 텍스트만 필요할 때 (a2a_data / 이전 output 무시).
- 검색 순서:
messages[]역순의 마지막HumanMessage→state["query"] - 빈 문자열 가능 (절대
None아님)
extract_latest_text(state) → str
직전 노드의 텍스트 응답을 이어쓸 때 — 직렬 플로우의 표준.
- 검색 순서:
state["output"]의output_text→messages[]역순의AIMessage/HumanMessage→state["query"]
extract_latest_output_data(state) → list[dict]
직전 노드가 만든 구조화 JSON 데이터를 이어쓸 때.
- 검색 순서:
state["output"]["output_data"]→messages[]역순 메시지 content 안의output_data - 빈 list 가능
세 함수 모두 messages 까지 거슬러 올라가는 fallback 이 있어, 직전 노드가 output 을 채우지 않아도 안전하게 동작합니다.
파일 — 입력 첨부 vs 노드 산출물
섹션 제목: “파일 — 입력 첨부 vs 노드 산출물”extract_a2a_files(state) → list[dict]
최초 A2A 요청에 첨부된 파일 (사용자 업로드) 을 읽을 때.
state["metadata"]["a2a_files"]만 봄. fallback 없음.
extract_output_files(state) → list[dict]
직전 노드가 만든 파일을 다음 노드에서 처리할 때.
state["output"]["output_files"]만 봄.extract_a2a_files와 헷갈리지 말 것 — 사용자 첨부 가 아니라 노드 산출물
resolve_file_bytes(file_dict) → bytes
위 두 함수가 돌려준 dict 에서 실제 bytes 가 필요할 때 (OCR / hash / 업로드).
- 시도 순서:
bytes_b64→data:URI →file://URI → 로컬 path - 실패 시
ValueError/FileNotFoundErrorraise (None 반환 아님) https://URI 는 미지원 — 별도 helper 필요 (아래 URI 로 받은 PDF 참고)
A2A DataPart — 구조화 JSON 입력
섹션 제목: “A2A DataPart — 구조화 JSON 입력”extract_a2a_data(state) → list[dict]
최초 A2A 요청의 DataPart (구조화 JSON 입력) — 예: orderId, documentId, userId.
state["metadata"]["a2a_data"]만 봄- dict 아닌 항목은 자동 필터링
Registry agent 호출
섹션 제목: “Registry agent 호출”build_agent_context(state) → str
다음 Registry agent 에 사용자 질문 + 이전 노드 결과 (upstream summary / previous text / output_data) 를 LLM 친화적 1개 프롬프트로 합쳐서 넘길 때.
- upstream / previous / data 가 모두 비어있으면
effective_query만 그대로 반환 (오버헤드 0)
call_agent_auto(url, query, *, state, …)
Registry agent 호출 — 스트리밍 여부는 state["metadata"]["is_stream_request"] 로 자동 판정.
- 비스트리밍 →
call_agent, 스트리밍 →call_agent_stream_result - 반환 타입:
str또는dict
call_agent_auto 의 4가지 opt-in (composer 패턴 필수)
섹션 제목: “call_agent_auto 의 4가지 opt-in (composer 패턴 필수)”output = await call_agent_auto( agent_url, query, state=state, forward_inbound_data=True, # 들어온 DataPart 를 sub-agent 에 그대로 전달 forward_inbound_files=True, # 들어온 FilePart 를 sub-agent 에 그대로 전달 forward_inbound_metadata=False, # 주의 — SDK 내부 키 (a2a_data 등) 까지 전달됨 raise_on_pending=True, # 미해결 sub-agent 면 sentinel 대신 즉시 에러)| 옵션 | 기본 | 켜야 하는 경우 |
|---|---|---|
forward_inbound_data | False | composer 패턴 — 사용자가 보낸 JSON 입력을 sub-agent 가 그대로 받아야 할 때 |
forward_inbound_files | False | composer 패턴 — 사용자가 보낸 파일을 sub-agent 가 그대로 받아야 할 때 |
forward_inbound_metadata | False | 거의 안 켬. is_stream_request 등 SDK 내부 키도 함께 전달되어 sub-agent 가 잘못 해석할 위험 |
raise_on_pending | False | sub-agent 가 registry 에 미등록이면 sentinel 대신 명시적 에러로 부팅·요청 차단 |
직렬 vs 병렬 — 어떻게 받는가
섹션 제목: “직렬 vs 병렬 — 어떻게 받는가”AgentState 의 필드별 갱신 방식이 다르므로 패턴에 맞춰 헬퍼를 선택합니다.
| 필드 | 직렬 (A→B→C) | 병렬 (A,B→C) |
|---|---|---|
messages | 자동 누적 | 자동 누적 (add_messages reducer) |
output | 다음 노드가 그대로 읽음 | 충돌 — reducer 없음. merge 노드에서 합쳐서 한 번에 write 필요 |
metadata | 보통 변경 안 함 | merge 노드에서 의도적으로 합쳐야 함 |
직렬에서는 extract_latest_text / extract_latest_output_data 가 바로 앞 노드의 output 을 읽습니다. 병렬에서는 각 분기를 messages 의 additional_kwargs={"source": ...} 로 구분해서 보내고, merge 노드에서 합쳐 output_data 로 통합합니다 (아래 병렬 merge 패턴 참고).
사용자 텍스트 읽기
섹션 제목: “사용자 텍스트 읽기”async def route_node(state) -> dict: query = extract_query(state) intent = "order" if "주문" in query else "general" return {"output": {"output_text": query, "output_data": {"intent": intent}}}이전 노드 데이터 읽기
섹션 제목: “이전 노드 데이터 읽기”async def business_logic(state) -> dict: data = extract_latest_output_data(state) order = data[0] if data else {} summary = f"주문 상태는 {order.get('status', 'unknown')}입니다." return {"output": {"output_text": summary, "output_data": order}}요청 DataPart 읽기
섹션 제목: “요청 DataPart 읽기”async def use_request_data(state) -> dict: inputs = extract_a2a_data(state) payload = inputs[0] if inputs else {} return {"output": {"output_text": "입력 데이터를 확인했습니다.", "output_data": payload}}요청 FilePart 읽기
섹션 제목: “요청 FilePart 읽기”async def use_request_file(state) -> dict: files = extract_a2a_files(state) if not files: return {"output": "첨부 파일이 없습니다."}
content = resolve_file_bytes(files[0]) return {"output": {"output_text": f"{len(content)} bytes 파일을 받았습니다."}}resolve_file_bytes()는 bytes_b64, data: URI, file:// 또는 로컬 경로를 읽습니다.
https://... URI까지 읽어야 하면 아래처럼 프로젝트 코드에 작은 helper를 추가하세요.
URI로 받은 PDF에 답변하기
섹션 제목: “URI로 받은 PDF에 답변하기”PDF 텍스트 추출이 필요하면 프로젝트에 pypdf를 추가합니다.
uv add pypdffrom io import BytesIO
import httpxfrom pypdf import PdfReader
from llamon_agent import AIMessage, runtime_output_textfrom llamon_agent.graph import ( call_agent_auto, extract_a2a_files, extract_latest_output_data, extract_query, resolve_file_bytes,)
# 프로젝트에서 정의하는 helper입니다. SDK가 제공하는 함수는 `resolve_file_bytes()`입니다.async def read_file_bytes(file: dict) -> bytes: uri = file.get("uri") if isinstance(uri, str) and uri.startswith(("http://", "https://")): async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client: response = await client.get(uri) response.raise_for_status() return response.content return resolve_file_bytes(file)
def extract_pdf_text(pdf_bytes: bytes) -> str: reader = PdfReader(BytesIO(pdf_bytes)) return "\n\n".join((page.extract_text() or "").strip() for page in reader.pages).strip()
async def read_pdf_node(state) -> dict: query = extract_query(state) files = extract_a2a_files(state) if not files: return {"output": "PDF 파일을 함께 첨부해주세요."}
pdf_bytes = await read_file_bytes(files[0]) document_text = extract_pdf_text(pdf_bytes) if not document_text: return {"output": "PDF에서 읽을 수 있는 텍스트를 찾지 못했습니다."}
return { "output": { "output_text": "PDF 내용을 읽었습니다.", "output_data": { "query": query, "document_text": document_text, "file_name": files[0].get("name"), }, } }
async def answer_from_pdf_node(state, *, agent_url: str) -> dict: data = extract_latest_output_data(state) payload = data[0] if data else {} prompt = f"""문서 내용만 근거로 질문에 답변하세요.
질문:{payload.get("query", "")}
문서:{payload.get("document_text", "")}""" output = await call_agent_auto(agent_url, prompt, state=state) return {"messages": [AIMessage(content=runtime_output_text(output))], "output": output}여러 입력 중 하나 고르기
섹션 제목: “여러 입력 중 하나 고르기”DataPart/FilePart가 여러 개면 InputContracts로 shape를 선언하고 결정적으로 고르세요.
자세한 scoring 규칙은 입력 선별을 참고하세요.
아래는 DataPart 예시입니다. FilePart는 FileContract와 select_input_file(state, contracts, name)을 사용합니다.
from llamon_agent import DataContract, DataFieldContract, InputContractsfrom llamon_agent.graph import select_input_data
INPUT_CONTRACTS = InputContracts(data=[ DataContract( name="order", fields=[DataFieldContract(key="orderId", value_type="string")], )])
async def business_logic(state) -> dict: order = select_input_data(state, INPUT_CONTRACTS, "order") if order is None: return {"output": "order 입력을 찾지 못했습니다."} return {"output": {"output_text": "주문 입력을 선택했습니다.", "output_data": order}}Registry agent 호출
섹션 제목: “Registry agent 호출”다음 에이전트에 넘길 프롬프트는 build_agent_context(state)를 기본으로 쓰면 됩니다.
이전 노드의 텍스트와 output_data가 함께 정리됩니다.
from llamon_agent import AIMessage, runtime_output_textfrom llamon_agent.graph import build_agent_context, call_agent_auto
async def final_agent_node(state, *, agent_url: str) -> dict: output = await call_agent_auto(agent_url, build_agent_context(state), state=state) return { "messages": [AIMessage(content=runtime_output_text(output))], "output": output, }병렬 merge 패턴
섹션 제목: “병렬 merge 패턴”병렬 분기에서는 각 노드가 output을 동시에 쓰지 않게 하고, source 태그로 구분합니다.
from llamon_agent import AIMessage
async def branch_a(state) -> dict: return {"messages": [AIMessage(content="A 결과", additional_kwargs={"source": "a"})]}
async def merge(state) -> dict: results = {} for msg in state.get("messages", []): source = getattr(msg, "additional_kwargs", {}).get("source") if source: results[source] = str(msg.content) return {"output": {"output_text": "병렬 결과를 합쳤습니다.", "output_data": results}}State 안전 추출 — require_state_str / require_state_list
섹션 제목: “State 안전 추출 — require_state_str / require_state_list”state["metadata"]["orderId"] 같은 키를 노드가 반드시 읽어야 하는 경우,
isinstance 가드 + raise boilerplate 가 반복됩니다. require_state_str /
require_state_list 는 type-safe 추출 + 명확한 LlamonApplicationError 를
한 줄로 처리하고 pyright/mypy strict 의 자동 narrowing 까지 보장합니다.
from llamon_agent.graph import require_state_str, require_state_list
async def my_node(state) -> dict: metadata = state.get("metadata", {}) or {} order_id = require_state_str(metadata, "orderId") # type: str (narrowed) items = require_state_list(metadata, "items") # type: list (또는 []) ...자동 errorReason / 보조 필드
섹션 제목: “자동 errorReason / 보조 필드”caller 가 reason 을 명시하지 않으면 snake_case ASCII 컨벤션을 따르는
자동 reason 사용 — field / expectedType / receivedType 는 trace event
보조 필드로 자동 노출:
| 상황 | errorReason | extra 보조 필드 |
|---|---|---|
| 부재 / 빈 문자열 | state_field_required | field, expectedType="str", receivedType="None" |
| 타입 mismatch | state_field_invalid_type | field, expectedType, receivedType |
도메인 의미가 필요하면 caller 가 직접 명시:
order_id = require_state_str( metadata, "orderId", reason="order_id_required", # snake_case ASCII 권장 message="주문 ID가 누락되었습니다",)TypedDict 와 함께 (옵션)
섹션 제목: “TypedDict 와 함께 (옵션)”AgentState 를 확장한 도메인-specific TypedDict 로 type 안전성 강화:
from typing import NotRequiredfrom llamon_agent.graph import AgentState, require_state_str
class AppState(AgentState): """프로젝트별 추가 필드. NotRequired 권장.""" orderId: NotRequired[str] customerId: NotRequired[str]
async def ingest_node(state: AppState) -> dict: metadata = state.get("metadata", {}) or {} order_id = require_state_str(metadata, "orderId") customer_id = require_state_str(metadata, "customerId") ...scaffold 한 신규 프로젝트의 nodes.py 상단 주석에 같은 안내가 포함됩니다 —
필요 시 주석 해제 후 활용.
관련 페이지
섹션 제목: “관련 페이지”- 패턴별 연결: 직렬 · 병렬 · 조건 분기 · HTTP 파이프라인
- 공통 규칙: 플로우 공통 패턴
- 입력 선별: InputContracts —
select_input_data_as의 TypedDict 마이그레이션 가이드도 같이 참조