콘텐츠로 이동

플로우 — 병렬 (flow-parallel)

--runtime-source registry(기본): 노드가 create_registry_node() 기반으로 동작합니다. --runtime-source local: 노드가 Agent(model=..., provider=...) 기반 로컬 LLM으로 동작합니다.

이 페이지는 같은 입력을 여러 관점에서 동시에 처리한 뒤 하나로 합치고 싶을 때 보면 됩니다.

START → agent_a ─┐
├→ merge → business_logic → END
START → agent_b ─┘

agent_a와 agent_b가 동시에 실행되고, merge에서 결과를 합친 뒤 비즈니스 로직에서 후처리합니다.


순서파일역할
app/config.py에이전트 UUID 상수
app/nodes.py노드 함수 + merge + 비즈니스 로직
app/graph.py노드 연결 (GraphBuilder)
app/agent_card.py카드 정보·스킬
main.py진입점 (수정 불필요)

AGENT_A_ID = "<YOUR_FLOW_AGENT_ID>" # [수정]
AGENT_B_ID = "<YOUR_FLOW_AGENT_ID>" # [수정]

병렬 노드는 output에 쓰지 않고 messages에만 source 태그를 달아 기록합니다. merge_node가 source별로 결과를 모읍니다.

from llamon_agent import AIMessage
from llamon_agent.runtime import runtime_output_text
from llamon_agent.graph import build_agent_context
from llamon_agent.a2a import call_agent
async def agent_a_node(state, *, agent_a_url: str) -> dict:
output = await call_agent(agent_a_url, build_agent_context(state))
return {
"messages": [AIMessage(
content=runtime_output_text(output),
additional_kwargs={"source": "agent_a", "raw_output": output},
)]
}
async def agent_b_node(state, *, agent_b_url: str) -> dict:
output = await call_agent(agent_b_url, build_agent_context(state))
return {
"messages": [AIMessage(
content=runtime_output_text(output),
additional_kwargs={"source": "agent_b", "raw_output": output},
)]
}
async def merge_node(state) -> dict:
by_source_text = {}
by_source_output = {}
for message in state.get("messages", []):
if not isinstance(message, AIMessage):
continue
source = message.additional_kwargs.get("source")
if not source:
continue
by_source_text[source] = str(message.content)
raw_output = message.additional_kwargs.get("raw_output")
by_source_output[source] = raw_output if isinstance(raw_output, dict) else {"output_text": str(message.content)}
merged = "\n\n---\n\n".join(f"[{src}]\n{text}" for src, text in by_source_text.items())
output = {"output_text": merged, "output_data": {"results": by_source_output}}
return {"messages": [AIMessage(content=merged)], "output": output}
# 비즈니스 로직 — 여기에 통합 결과 후처리를 작성하세요
async def business_logic(state) -> dict:
merged_result = state.get("output", "")
return {"messages": [AIMessage(content=runtime_output_text(merged_result))], "output": merged_result}
  • output 키에 쓰지 마세요 — 동시 갱신 시 last-write-wins 충돌이 발생합니다
  • additional_kwargs={"source": "노드이름"}으로 태깅하면 merge에서 구분할 수 있습니다
  • 원본 결과는 raw_output으로 함께 싣고, merge_node는 output_data.results에 source별 결과를 보존합니다
  • merge_node는 output에 합산 결과를 기록하므로 이후 노드에서 state["output"] 또는 extract_latest_output_data()로 접근합니다

build_graph()에서 Registry UUID → A2A URL 변환 후 fan-out / fan-in으로 연결합니다.

from llamon_agent.config import Settings
from llamon_agent.graph import START, END, GraphBuilder
from llamon_agent.a2a import LLaMONRegistryClient
from app.config import AGENT_A_ID, AGENT_B_ID
from app.nodes import agent_a_node, agent_b_node, merge_node, business_logic
async def build_graph(settings: Settings):
registry = LLaMONRegistryClient(host=settings.LLAMON_REGISTRY_HOST)
url_a = await registry.resolve_url(AGENT_A_ID)
url_b = await registry.resolve_url(AGENT_B_ID)
async def _agent_a(state): return await agent_a_node(state, agent_a_url=url_a)
async def _agent_b(state): return await agent_b_node(state, agent_b_url=url_b)
return (
GraphBuilder()
.node("agent_a", _agent_a, node_kind="registry_node")
.node("agent_b", _agent_b, node_kind="registry_node")
.node("merge", merge_node, node_kind="merge")
.node("business_logic", business_logic, node_kind="business")
.edge(START, "agent_a").edge(START, "agent_b")
.edge("agent_a", "merge").edge("agent_b", "merge")
.edge("merge", "business_logic").edge("business_logic", END)
.build()
)

message/stream 요청 시 병렬 분기와 merge 노드는 sync로 실행되고, END에 연결된 exit 노드(business_logic)output이 최종 artifact가 됩니다. exit 노드가 Registry/LLM을 호출하는 구조라면 call_agent_auto / call_llm을 사용하세요.

동작 원리와 구현 코드는 플로우 공통 패턴 — 스트리밍을 참고하세요.


  1. config.pyAGENT_C_ID 추가
  2. nodes.pyagent_c_node 작성 (source 태그 "agent_c")
  3. graph.py에서 .node("agent_c", ...) + .edge(START, "agent_c") + .edge("agent_c", "merge") 추가
  4. merge_node에서 "agent_c" source 수집 로직 추가