플로우 — HTTP 파이프라인 (flow-http)
이 페이지는 외부 HTTP 응답을 받아 가공한 뒤 마지막 에이전트가 정리하는 파이프라인을 만들고 싶을 때 보면 됩니다.
토폴로지
섹션 제목: “토폴로지”START → http_fetch ─┬→ transform_summary ─┐ └→ transform_flags ───┤→ merge_results → final_agent → END외부 API를 호출하고, 결과를 병렬 변환 노드에서 각각 가공한 뒤, merge에서 합치고, Registry 에이전트가 최종 응답을 생성합니다.
파일 구조
섹션 제목: “파일 구조”| 순서 | 파일 | 역할 |
|---|---|---|
| ① | app/config.py | 최종 에이전트 UUID 상수 |
| ② | app/nodes.py | HTTP/Transform/Merge/Agent 노드 |
| ③ | app/graph.py | 노드 연결 (GraphBuilder) |
| ④ | app/agent_card.py | 카드 정보·스킬 |
| — | main.py | 진입점 (수정 불필요) |
① app/config.py
섹션 제목: “① app/config.py”AGENT_FINAL_ID = "<YOUR_AGENT_ID>" # [수정]② app/nodes.py
섹션 제목: “② app/nodes.py”4종류의 노드로 구성됩니다. 먼저 공통 import:
import jsonimport httpxfrom llamon_agent.runtime import AIMessage, runtime_output_textfrom llamon_agent.graph import build_agent_context, call_agent_auto, extract_latest_output_dataHTTP 호출 노드
섹션 제목: “HTTP 호출 노드”외부 API를 호출하여 원본 데이터를 가져옵니다.
async def http_fetch(state) -> dict: async with httpx.AsyncClient(timeout=20) as client: response = await client.get("https://jsonplaceholder.typicode.com/todos/1") response.raise_for_status() payload = response.json()
output = { "output_text": f"Fetched todo #{payload['id']}", "output_data": payload, } return {"messages": [AIMessage(content=json.dumps(payload, ensure_ascii=False))], "output": output}병렬 변환 노드
섹션 제목: “병렬 변환 노드”http_fetch의 output_data를 서로 다른 관점으로 가공합니다.
병렬 실행되므로 output에 직접 쓰지 않고 messages에 source 태그를 달아 기록합니다.
async def transform_summary(state) -> dict: payload = extract_latest_output_data(state)[0] result = {"view": "summary", "title": payload["title"], ...} return { "messages": [AIMessage( content=json.dumps(result, ensure_ascii=False), additional_kwargs={"source": "transform_summary"}, )], }
async def transform_flags(state) -> dict: payload = extract_latest_output_data(state)[0] result = {"view": "flags", "priority": "done" if payload["completed"] else "pending", ...} return { "messages": [AIMessage( content=json.dumps(result, ensure_ascii=False), additional_kwargs={"source": "transform_flags"}, )], }merge 노드
섹션 제목: “merge 노드”source 태그 기준으로 병렬 결과를 수집하여 하나의 dict로 합칩니다.
async def merge_results(state) -> dict: merged_payload = {} for msg in state.get("messages", []): if not isinstance(msg, AIMessage): continue source = msg.additional_kwargs.get("source") if source: merged_payload[source] = json.loads(str(msg.content))
output = {"output_text": ..., "output_data": merged_payload} return {"messages": [AIMessage(content=json.dumps(merged_payload))], "output": output}최종 에이전트 노드
섹션 제목: “최종 에이전트 노드”merge 결과를 Registry 에이전트에 A2A로 전달하여 최종 응답을 생성합니다.
async def final_agent_node(state, *, agent_final_url: str) -> dict: output = await call_agent_auto(agent_final_url, build_agent_context(state), state=state) return {"messages": [AIMessage(content=runtime_output_text(output))], "output": output}노드 작성 규칙
섹션 제목: “노드 작성 규칙”- 병렬 변환 노드는
output키에 쓰지 마세요 — last-write-wins 충돌 발생 additional_kwargs={"source": "노드이름"}으로 태깅하면 merge에서 구분 가능- HTTP 노드는
output_data에 구조화 데이터를 저장하고, 변환 노드는extract_latest_output_data()로 접근
③ app/graph.py
섹션 제목: “③ app/graph.py”async def build_graph(settings: Settings): registry = LLaMONRegistryClient(host=settings.LLAMON_REGISTRY_HOST) url_final = await registry.resolve_url(AGENT_FINAL_ID)
async def _final_agent(state): return await final_agent_node(state, agent_final_url=url_final)
return ( GraphBuilder() .node("http_fetch", http_fetch, node_kind="http") .node("transform_summary", transform_summary, node_kind="transform") .node("transform_flags", transform_flags, node_kind="transform") .node("merge_results", merge_results, node_kind="merge") .node("final_agent", _final_agent, node_kind="registry_node") .edge(START, "http_fetch") .edge("http_fetch", "transform_summary") .edge("http_fetch", "transform_flags") .edge("transform_summary", "merge_results") .edge("transform_flags", "merge_results") .edge("merge_results", "final_agent") .edge("final_agent", END) .build() )스트리밍 동작
섹션 제목: “스트리밍 동작”message/stream 요청 시 exit 노드(final_agent)만 토큰 스트리밍을 수행합니다. HTTP/변환/merge 노드는 항상 sync 호출됩니다.
동작 원리와 구현 코드는 플로우 공통 패턴 — 스트리밍을 참고하세요.
커스터마이징
섹션 제목: “커스터마이징”변환 분기 추가 (3-way)
섹션 제목: “변환 분기 추가 (3-way)”nodes.py에transform_detail노드 작성 (source 태그"transform_detail")graph.py에서.node("transform_detail", ...)+.edge("http_fetch", "transform_detail")+.edge("transform_detail", "merge_results")추가
HTTP 호출 변경
섹션 제목: “HTTP 호출 변경”http_fetch 노드의 URL, 헤더, 인증, 요청 방식을 실제 API에 맞게 수정하세요.
관련 문서
섹션 제목: “관련 문서”- 메모리 추가:
--memory postgres→ 멀티턴 메모리 에이전트 - 다른 패턴: 직렬 · 병렬 · 조건 분기
- 디버깅:
.env에서LOG_LEVEL=DEBUG→ 문제 해결