콘텐츠로 이동

플로우 — 직렬 (flow-seq)

--runtime-source registry(기본): 노드가 create_registry_node() 기반으로 동작합니다. --runtime-source local: 노드가 Agent(model=..., provider=...) 기반 로컬 LLM으로 동작합니다.

이 페이지는 결과를 순서대로 넘기는 가장 단순한 플로우를 만들고 싶을 때 보면 됩니다.

START → agent_a → business_logic → agent_b → END

agent_a의 결과를 비즈니스 로직에서 가공한 뒤 agent_b에 전달합니다.


순서파일역할
app/config.py에이전트 UUID 상수
app/nodes.py노드 함수 + 비즈니스 로직
app/graph.py노드 연결 (GraphBuilder)
app/agent_card.py카드 정보·스킬
main.py진입점 (수정 불필요)

AGENT_A_ID = "<YOUR_FLOW_AGENT_ID>" # [수정]
AGENT_B_ID = "<YOUR_FLOW_AGENT_ID>" # [수정]

각 노드는 async def로 작성하고, Registry 에이전트는 call_agent_auto(url, build_agent_context(state), state=state)로 A2A 호출합니다.

from llamon_agent import AIMessage, runtime_output_text
from llamon_agent.graph import (
build_agent_context,
call_agent_auto,
extract_latest_output_data,
extract_latest_text,
)
async def agent_a_node(state, *, agent_a_url: str) -> dict:
output = await call_agent_auto(agent_a_url, build_agent_context(state), state=state)
return {"messages": [AIMessage(content=runtime_output_text(output))], "output": output}
# 비즈니스 로직 — 여기에 가공 로직을 작성하세요
async def business_logic(state) -> dict:
text = extract_latest_text(state)
data = extract_latest_output_data(state)
output = {"output_text": text, "output_data": data[0] if len(data) == 1 else data} if data else text
return {"messages": [AIMessage(content=text)], "output": output}
async def agent_b_node(state, *, agent_b_url: str) -> dict:
"""exit 노드. call_agent_auto 가 state 를 보고 스트리밍/비스트리밍 자동 선택."""
query = build_agent_context(state)
output = await call_agent_auto(agent_b_url, query, state=state)
return {"messages": [AIMessage(content=runtime_output_text(output))], "output": output}
  • state["output"]에서 현재 state의 최신 결과를 읽습니다
  • 반환값에 "output" 키를 항상 포함해야 다음 노드가 결과를 받습니다
  • metadata를 전달하려면 "metadata": {**state.get("metadata", {}), "key": "value"}

build_graph()에서 Registry UUID → A2A URL 변환 후 GraphBuilder로 연결합니다.

from llamon_agent.config import Settings
from llamon_agent.graph import START, END, GraphBuilder
from llamon_agent.a2a import LLaMONRegistryClient
from app.config import AGENT_A_ID, AGENT_B_ID
from app.nodes import agent_a_node, agent_b_node, business_logic
async def build_graph(settings: Settings):
registry = LLaMONRegistryClient(host=settings.LLAMON_REGISTRY_HOST)
url_a = await registry.resolve_url(AGENT_A_ID)
url_b = await registry.resolve_url(AGENT_B_ID)
async def _agent_a(state): return await agent_a_node(state, agent_a_url=url_a)
async def _agent_b(state): return await agent_b_node(state, agent_b_url=url_b)
return (
GraphBuilder()
.node("agent_a", _agent_a, node_kind="registry_node")
.node("business_logic", business_logic, node_kind="business")
.node("agent_b", _agent_b, node_kind="registry_node")
.edge(START, "agent_a").edge("agent_a", "business_logic")
.edge("business_logic", "agent_b").edge("agent_b", END)
.build()
)

message/stream 요청 시 exit 노드(agent_b)만 토큰 스트리밍을 수행합니다. 중간 노드는 항상 sync 호출됩니다.

동작 원리, Registry/로컬 LLM별 구현 코드는 플로우 공통 패턴 — 스트리밍을 참고하세요.


Studio UI에서 노드를 추가하거나 app/nodes.py, app/graph.py를 직접 편집하세요.

선형 그래프면 엣지가 자동 재연결됩니다. → 직접 편집 기준은 Studio UI 또는 app/nodes.py, app/graph.py를 참고하세요.

Studio UI에서 추가하면 app/nodes.py에 호출별 asyncpg.connect() 기반 시작 코드가 생성됩니다. .env의 **POSTGRES_URL**만 사용합니다.

변수용도주체
POSTGRES_URL비즈니스 쿼리 (postgres/커스텀 노드)사용자 노드의 asyncpg
POSTGRES_MEMORY_DSN세션 메모리 checkpointerSDK 내부 (LangGraph)

두 변수는 서로 영향을 주지 않습니다. 같은 서버여도 DB는 분리 권장 (app_db vs <project>_memory).

scaffold된 .env.examplePOSTGRES_URL 블록은 주석 처리되어 있습니다 — 실제로 노드를 추가할 때 주석을 해제하세요.

생성된 노드 코드에 기본 포함되는 안전장치:

  • asyncpg.connect(dsn, timeout=5.0, command_timeout=10.0) — DSN 오타/네트워크 단절 시 즉시 실패
  • json.dumps(..., default=_json_default)datetime/Decimal/UUID/bytes 안전 직렬화
  • record_id 즉시 실패 가드
  • 함수 말미 [P] 블록에 pool 승급 패턴 내장 (QPS 증가 시 lifespan의 asyncpg.create_pool로 교체)