콘텐츠로 이동

플로우 — HTTP 파이프라인 (flow-http)

이 페이지는 외부 HTTP 응답을 받아 가공한 뒤 마지막 에이전트가 정리하는 파이프라인을 만들고 싶을 때 보면 됩니다.

START → http_fetch ─┬→ transform_summary ─┐
└→ transform_flags ───┤→ merge_results → final_agent → END

외부 API를 호출하고, 결과를 병렬 변환 노드에서 각각 가공한 뒤, merge에서 합치고, Registry 에이전트가 최종 응답을 생성합니다.


순서파일역할
app/config.py최종 에이전트 UUID 상수
app/nodes.pyHTTP/Transform/Merge/Agent 노드
app/graph.py노드 연결 (GraphBuilder)
app/agent_card.py카드 정보·스킬
main.py진입점 (수정 불필요)

AGENT_FINAL_ID = "<YOUR_AGENT_ID>" # [수정]

4종류의 노드로 구성됩니다. 먼저 공통 import:

import json
import httpx
from llamon_agent.runtime import AIMessage, runtime_output_text
from llamon_agent.graph import build_agent_context, call_agent_auto, extract_latest_output_data

외부 API를 호출하여 원본 데이터를 가져옵니다.

async def http_fetch(state) -> dict:
async with httpx.AsyncClient(timeout=20) as client:
response = await client.get("https://jsonplaceholder.typicode.com/todos/1")
response.raise_for_status()
payload = response.json()
output = {
"output_text": f"Fetched todo #{payload['id']}",
"output_data": payload,
}
return {"messages": [AIMessage(content=json.dumps(payload, ensure_ascii=False))], "output": output}

http_fetchoutput_data를 서로 다른 관점으로 가공합니다. 병렬 실행되므로 output에 직접 쓰지 않고 messagessource 태그를 달아 기록합니다.

async def transform_summary(state) -> dict:
payload = extract_latest_output_data(state)[0]
result = {"view": "summary", "title": payload["title"], ...}
return {
"messages": [AIMessage(
content=json.dumps(result, ensure_ascii=False),
additional_kwargs={"source": "transform_summary"},
)],
}
async def transform_flags(state) -> dict:
payload = extract_latest_output_data(state)[0]
result = {"view": "flags", "priority": "done" if payload["completed"] else "pending", ...}
return {
"messages": [AIMessage(
content=json.dumps(result, ensure_ascii=False),
additional_kwargs={"source": "transform_flags"},
)],
}

source 태그 기준으로 병렬 결과를 수집하여 하나의 dict로 합칩니다.

async def merge_results(state) -> dict:
merged_payload = {}
for msg in state.get("messages", []):
if not isinstance(msg, AIMessage):
continue
source = msg.additional_kwargs.get("source")
if source:
merged_payload[source] = json.loads(str(msg.content))
output = {"output_text": ..., "output_data": merged_payload}
return {"messages": [AIMessage(content=json.dumps(merged_payload))], "output": output}

merge 결과를 Registry 에이전트에 A2A로 전달하여 최종 응답을 생성합니다.

async def final_agent_node(state, *, agent_final_url: str) -> dict:
output = await call_agent_auto(agent_final_url, build_agent_context(state), state=state)
return {"messages": [AIMessage(content=runtime_output_text(output))], "output": output}
  • 병렬 변환 노드는 output 키에 쓰지 마세요 — last-write-wins 충돌 발생
  • additional_kwargs={"source": "노드이름"}으로 태깅하면 merge에서 구분 가능
  • HTTP 노드는 output_data에 구조화 데이터를 저장하고, 변환 노드는 extract_latest_output_data()로 접근

async def build_graph(settings: Settings):
registry = LLaMONRegistryClient(host=settings.LLAMON_REGISTRY_HOST)
url_final = await registry.resolve_url(AGENT_FINAL_ID)
async def _final_agent(state): return await final_agent_node(state, agent_final_url=url_final)
return (
GraphBuilder()
.node("http_fetch", http_fetch, node_kind="http")
.node("transform_summary", transform_summary, node_kind="transform")
.node("transform_flags", transform_flags, node_kind="transform")
.node("merge_results", merge_results, node_kind="merge")
.node("final_agent", _final_agent, node_kind="registry_node")
.edge(START, "http_fetch")
.edge("http_fetch", "transform_summary")
.edge("http_fetch", "transform_flags")
.edge("transform_summary", "merge_results")
.edge("transform_flags", "merge_results")
.edge("merge_results", "final_agent")
.edge("final_agent", END)
.build()
)

message/stream 요청 시 exit 노드(final_agent)만 토큰 스트리밍을 수행합니다. HTTP/변환/merge 노드는 항상 sync 호출됩니다.

동작 원리와 구현 코드는 플로우 공통 패턴 — 스트리밍을 참고하세요.


  1. nodes.pytransform_detail 노드 작성 (source 태그 "transform_detail")
  2. graph.py에서 .node("transform_detail", ...) + .edge("http_fetch", "transform_detail") + .edge("transform_detail", "merge_results") 추가

http_fetch 노드의 URL, 헤더, 인증, 요청 방식을 실제 API에 맞게 수정하세요.