플로우 — 병렬 (flow-parallel)
--runtime-source registry(기본): 노드가 create_registry_node() 기반으로 동작합니다.
--runtime-source local: 노드가 Agent(model=..., provider=...) 기반 로컬 LLM으로 동작합니다.
이 페이지는 같은 입력을 여러 관점에서 동시에 처리한 뒤 하나로 합치고 싶을 때 보면 됩니다.
토폴로지
섹션 제목: “토폴로지”START → agent_a ─┐ ├→ merge → business_logic → ENDSTART → agent_b ─┘agent_a와 agent_b가 동시에 실행되고, merge에서 결과를 합친 뒤 비즈니스 로직에서 후처리합니다.
파일 구조
섹션 제목: “파일 구조”| 순서 | 파일 | 역할 |
|---|---|---|
| ① | app/config.py | 에이전트 UUID 상수 |
| ② | app/nodes.py | 노드 함수 + merge + 비즈니스 로직 |
| ③ | app/graph.py | 노드 연결 (GraphBuilder) |
| ④ | app/agent_card.py | 카드 정보·스킬 |
| — | main.py | 진입점 (수정 불필요) |
① app/config.py
섹션 제목: “① app/config.py”AGENT_A_ID = "<YOUR_FLOW_AGENT_ID>" # [수정]AGENT_B_ID = "<YOUR_FLOW_AGENT_ID>" # [수정]② app/nodes.py
섹션 제목: “② app/nodes.py”병렬 노드는 output에 쓰지 않고 messages에만 source 태그를 달아 기록합니다.
merge_node가 source별로 결과를 모읍니다.
from llamon_agent import AIMessagefrom llamon_agent.runtime import runtime_output_textfrom llamon_agent.graph import build_agent_contextfrom llamon_agent.a2a import call_agent
async def agent_a_node(state, *, agent_a_url: str) -> dict: output = await call_agent(agent_a_url, build_agent_context(state)) return { "messages": [AIMessage( content=runtime_output_text(output), additional_kwargs={"source": "agent_a", "raw_output": output}, )] }
async def agent_b_node(state, *, agent_b_url: str) -> dict: output = await call_agent(agent_b_url, build_agent_context(state)) return { "messages": [AIMessage( content=runtime_output_text(output), additional_kwargs={"source": "agent_b", "raw_output": output}, )] }
async def merge_node(state) -> dict: by_source_text = {} by_source_output = {} for message in state.get("messages", []): if not isinstance(message, AIMessage): continue source = message.additional_kwargs.get("source") if not source: continue by_source_text[source] = str(message.content) raw_output = message.additional_kwargs.get("raw_output") by_source_output[source] = raw_output if isinstance(raw_output, dict) else {"output_text": str(message.content)}
merged = "\n\n---\n\n".join(f"[{src}]\n{text}" for src, text in by_source_text.items()) output = {"output_text": merged, "output_data": {"results": by_source_output}} return {"messages": [AIMessage(content=merged)], "output": output}
# 비즈니스 로직 — 여기에 통합 결과 후처리를 작성하세요async def business_logic(state) -> dict: merged_result = state.get("output", "") return {"messages": [AIMessage(content=runtime_output_text(merged_result))], "output": merged_result}병렬 노드 작성 규칙
섹션 제목: “병렬 노드 작성 규칙”output키에 쓰지 마세요 — 동시 갱신 시 last-write-wins 충돌이 발생합니다additional_kwargs={"source": "노드이름"}으로 태깅하면 merge에서 구분할 수 있습니다- 원본 결과는
raw_output으로 함께 싣고, merge_node는output_data.results에 source별 결과를 보존합니다 - merge_node는
output에 합산 결과를 기록하므로 이후 노드에서state["output"]또는extract_latest_output_data()로 접근합니다
③ app/graph.py
섹션 제목: “③ app/graph.py”build_graph()에서 Registry UUID → A2A URL 변환 후 fan-out / fan-in으로 연결합니다.
from llamon_agent.config import Settingsfrom llamon_agent.graph import START, END, GraphBuilderfrom llamon_agent.a2a import LLaMONRegistryClientfrom app.config import AGENT_A_ID, AGENT_B_IDfrom app.nodes import agent_a_node, agent_b_node, merge_node, business_logic
async def build_graph(settings: Settings): registry = LLaMONRegistryClient(host=settings.LLAMON_REGISTRY_HOST) url_a = await registry.resolve_url(AGENT_A_ID) url_b = await registry.resolve_url(AGENT_B_ID)
async def _agent_a(state): return await agent_a_node(state, agent_a_url=url_a) async def _agent_b(state): return await agent_b_node(state, agent_b_url=url_b)
return ( GraphBuilder() .node("agent_a", _agent_a, node_kind="registry_node") .node("agent_b", _agent_b, node_kind="registry_node") .node("merge", merge_node, node_kind="merge") .node("business_logic", business_logic, node_kind="business") .edge(START, "agent_a").edge(START, "agent_b") .edge("agent_a", "merge").edge("agent_b", "merge") .edge("merge", "business_logic").edge("business_logic", END) .build() )스트리밍 동작
섹션 제목: “스트리밍 동작”message/stream 요청 시 병렬 분기와 merge 노드는 sync로 실행되고, END에 연결된 exit 노드(business_logic) 의 output이 최종 artifact가 됩니다.
exit 노드가 Registry/LLM을 호출하는 구조라면 call_agent_auto / call_llm을 사용하세요.
동작 원리와 구현 코드는 플로우 공통 패턴 — 스트리밍을 참고하세요.
커스터마이징
섹션 제목: “커스터마이징”분기 추가 (3-way fan-out)
섹션 제목: “분기 추가 (3-way fan-out)”config.py에AGENT_C_ID추가nodes.py에agent_c_node작성 (source 태그"agent_c")graph.py에서.node("agent_c", ...)+.edge(START, "agent_c")+.edge("agent_c", "merge")추가merge_node에서"agent_c"source 수집 로직 추가
관련 문서
섹션 제목: “관련 문서”- 노드 추가/수정: Studio UI 또는
app/nodes.py,app/graph.py직접 편집 - 메모리 추가:
--memory postgres→ 멀티턴 메모리 에이전트 - 다른 패턴: 직렬 · 조건 분기
- 디버깅:
.env에서LOG_LEVEL=DEBUG→ 문제 해결