Registry 경로
agent=None 으로 두고 ExtensionConfig 만 전달.
Registry가 LLM/Prompt/Guardrail/MCP/A2A를 비동기로 로드 → ComponentContainer 조립 → LangGraphAgent 생성.
LLaMON Agent SDK는 포트와 어댑터(Hexagonal) 패턴을 따릅니다. 바깥에서 부품을 조립하고, 안쪽은 외부 시스템을 모른 채 실행합니다.
Driving Adapter → Application → Core → Driven Adapter (A2A · CLI · (use cases) (LangGraph (LLM · MCP · A2A · Studio · Kafka) runtime) Registry · Memory)create_server() 함수 하나가 두 경로를 모두 처리하며, 어느 쪽을 쓰더라도 외부에서 보이는 동작은 동일합니다.
Registry 경로
agent=None 으로 두고 ExtensionConfig 만 전달.
Registry가 LLM/Prompt/Guardrail/MCP/A2A를 비동기로 로드 → ComponentContainer 조립 → LangGraphAgent 생성.
코드 주입 경로
agent=my_graph 또는 agent=MyAdapter(...) 를 직접 전달.
query= 인터페이스가 없으면 GraphInvocationAdapter 가 자동으로 감쌉니다 (ACL). Registry guardrail이 있으면 GuardrailRuntimeAdapter 가 바깥을 한 겹 더 감쌉니다.
GraphInvocationAdapter 는 Anti-Corruption Layer 의 대표 예입니다. 외부에서 들어온 LangGraph 그래프는 query= 같은 도메인 용어를 모르지만, 어댑터가 그 사이를 변환하므로 CustomAgentExecutor 입장에서는 두 경로가 동일하게 보입니다.
아래는 서버가 시작될 때 Runtime 부품을 조립하는 개념도입니다. Registry 의존성이 있는 경우 MCP/A2A/LLM/Prompt/Guardrail/Memory 관련 참조를 먼저 확인하고, 그 결과로 ComponentContainer를 만든 뒤 Flow 또는 ReAct 그래프를 구성합니다.
위 도식에서 F (빌드시 캐시) 는 배포 단위로 거의 바뀌지 않는 값만 담습니다.
resolved.json 6 필드): llm_model, llm_base_url, llm_provider, llm_temperature, system_prompt, user_promptresolved.json 은 순수 최적화 캐시입니다. SDK 의 빌드시 CLI 모듈이 자동 생성하므로 손으로 작성하지 않으며, 파일이 없어도 런타임 기동 시 Registry 를 직접 호출하므로 동작에는 지장이 없습니다.
# (선택) Dockerfile RUN 단계에 추가 — Registry 왕복을 빌드 시점으로 앞당겨 런타임 기동을 단축RUN uv run python -m llamon_agent.inbound.server.resolve인자를 생략하면 런타임 main.py 와 동일한 설정 소스 — app/config.py 의 build_extension() — 를 그대로 사용합니다. 따라서 resolved.json 캐시가 실제 런타임 설정과 어긋날 일이 없습니다. 이 명령은 그 설정의 LLM·프롬프트를 Registry 에서 fetch → resolved.json 으로 직렬화 → 이미지에 함께 굽습니다. scaffold 가 만드는 기본 Dockerfile 에는 포함되지 않으니, Registry 왕복을 줄이고 싶을 때만 직접 추가하세요.
직접 만든 FastAPI/Flask 앱이 아니라 Starlette ASGI 앱 한 개입니다. A2A 표준 라이브러리(A2AStarletteApplication)가 만든 앱을 한 번 더 감싼 뒤, 관리/메모리/그래프 라우트를 덧붙인 형태입니다.
| 레이어 | 클래스 | 책임 |
|---|---|---|
| ASGI 진입 | Starlette(routes, lifespan) | uvicorn이 호출하는 최외곽 앱 |
| A2A 마운트 | A2AStarletteApplication | JSON-RPC 라우트 (message/send 등) |
| 요청 핸들러 | _StreamingAwareRequestHandler | send / stream 구분 플래그 주입 |
| 실행 어댑터 | CustomAgentExecutor | A2A RequestContext → 내부 호출로 변환 |
| 실행 본체 | LangGraphAgent / RuntimeAdapter / GraphInvocationAdapter | 그래프 실행 |
이 모든 조립은 inbound/server/factory.py :: create_server() 한 곳에서 이루어집니다.
# A2A JSON-RPC (mount)POST / method: "message/send" → 단일 응답 method: "message/stream" → SSE 청크 스트림
# 운영GET /healthz liveness/readinessGET /api/v1/status 컨테이너/MCP/A2A 상태GET /graph 그래프 시각화 JSON
# 메모리GET|DELETE /api/v1/memory/threadsGET|DELETE /api/v1/memory/threads/{id}GET /api/v1/memory/threads/{id}/messagesGET|PATCH /api/v1/memory/config
# Registry (LLAMON_INTERNAL_RUNTIME_CONTROL_ENABLED=1 일 때만)GET /api/v1/registry/metadata 런타임 refs 메타POST /api/v1/registry/reload 수동 반영
# AgentCardGET|PATCH /api/v1/card/urlmessage/send 요청 한 건이 들어왔을 때 거치는 단계입니다.
위 도식은 Registry 기반 LangGraphAgent의 일반 경로를 기준으로 한 개념도입니다. 코드 주입 RuntimeAdapter나 Flow 그래프는 내부 노드 구조가 다를 수 있지만, A2A executor → agent 호출 → 결과 직렬화라는 바깥 경계는 같습니다.
클라이언트 — {"method":"message/send", "params":{...}} 형태의 JSON-RPC 요청을 전송합니다.
uvicorn → Starlette — uvicorn은 HTTP 소켓을 받는 ASGI 서버입니다. 받은 HTTP 요청을 scope(메서드·경로·헤더 등이 담긴 dict)와 receive/send 콜러블로 변환해 ASGI 앱(외곽 Starlette)에 넘깁니다. HTTP를 파이썬 함수 호출로 변환하는 어댑터 역할을 합니다.
A2AStarletteApplication — JSON-RPC 페이로드를 파싱하고 method 별로 라우팅합니다.
_StreamingAwareRequestHandler.on_message_send — context.state["llamon_stream_request"] = False 로 표시해 executor가 단일 응답 경로임을 인식하게 합니다.
factory.py
CustomAgentExecutor.execute() — query 추출, 첨부 파트 분리, skill_id / user_id / thread_id 수집, RequestContext 정규화, Langfuse span 시작.
경로 우선순위: HITL 재개 → stream → dispatch → invoke_with_hitl → invoke → stream(폴백).
inbound/a2a/executor.py
self.agent.dispatch(...) — self.agent 의 실체는 셋 중 하나입니다: LangGraphAgent, 사용자 RuntimeAdapter, GraphInvocationAdapter. 필요에 따라 GuardrailRuntimeAdapter 가 바깥을 한 겹 더 감쌀 수 있습니다.
LangGraphAgent.dispatch() — 스킬 라우팅(토큰 스코어링 → 실패 시 LLM 폴백) → AgentState 빌드 → self._graph.ainvoke(state, config={"thread_id": ...}).
core/runtime/agent.py
LangGraph 실행 — router_node → react_agent → (tools loop) → END 순으로 흐릅니다. pre_model_hook 이 skill별 system_prompt 를 런타임에 동적으로 주입합니다.
Outbound 어댑터가 실제 I/O 수행 — outbound/providers/*(LLM), outbound/integrations/mcp/*(StreamableHTTP/stdio), outbound/integrations/a2a/*(원격 에이전트 호출), outbound/memory/*(체크포인터), 관측 emitter.
결과 직렬화 — normalize_runtime_output() 로 text/data/files 를 분리 → new_artifact("agent-response", parts=[...]) → event_queue.enqueue_event(completed_task(...)) → JSON-RPC 응답으로 직렬화되어 클라이언트에 반환됩니다.
4. on_message_send_stream() → llamon_stream_request = True5. CustomAgentExecutor.execute() → _execute_stream() 선택6. agent.stream(...) async iterator stream_mode=["custom", "messages"] custom : StreamWriter 가 push 한 청크 (exit node 만) messages : LLM 토큰 (exit node 만 필터)
청크마다 → TaskArtifactUpdateEvent(parts=[TextPart(chunk)])마지막 → TaskStatusUpdateEvent(state="completed")_llamon_exit_nodes 필터로 걸러집니다. 클라이언트가 받는 청크는 END 로 직접 연결된 노드에서 나온 토큰뿐입니다.ainvoke() 로 다시 실행해 단일 응답으로 만들어 반환합니다 (GraphInvocationAdapter.stream).소스 디렉토리와 헥사고날 역할의 대응표입니다. 어느 파일을 열어야 할지 결정할 때 참조하세요.
| 어댑터 | 진입 메서드 | 변환 대상 |
|---|---|---|
inbound/a2a/executor.py :: CustomAgentExecutor | execute(context, event_queue) | A2A RequestContext → dispatch/invoke/stream |
inbound/server/factory.py :: _StreamingAwareRequestHandler | on_message_send / on_message_send_stream | A2A method → state flag |
inbound/server/routes/internal_* | Starlette Route | HTTP → use case |
inbound/consumer/kafka_registry.py :: KafkaRegistryConsumer | start() 루프 | Kafka 이벤트 → ProcessRegistryEventUseCase |
cli/commands/* | argparse 진입점 | CLI 인자 → use case |
studio/routes/* | Starlette HTTP | Studio UI → 파일시스템 use case |
application/usecases/* — 모든 use case는 dataclass Request → Result 시그니처로 통일되어 있습니다.
| Use case | 역할 |
|---|---|
CreateServerUseCase | create_server() 의 오케스트레이션 본체 |
CreateRegistryNodeUseCase | 서버 없이 노드 단위 에이전트 생성 |
ApplyRegistryReloadUseCase | Registry 변경 → 컴포넌트 재로드 |
ProcessRegistryEventUseCase | Kafka 이벤트 self-filter + dispatch |
CollectCardResourcesUseCase | 그래프에서 LLM/MCP 자동 수집 |
ParseGraphUseCase · ValidateFlowGraphUseCase | Flow 그래프 파싱/검증 |
| 포트 (Protocol) | 어댑터 구현 |
|---|---|
RegistryRuntimePort | outbound/runtime/registry_runtime.py :: ContainerRegistryRuntime |
RuntimePort | outbound/providers/runtime_wrapper.py |
| LLM | outbound/providers/{openai,anthropic,ollama}.py |
| MCP 도구 | outbound/integrations/mcp/client.py (StreamableHTTP / stdio) |
| A2A 원격 호출 | outbound/integrations/a2a/* |
| Registry 조회 | outbound/registry/client.py :: LLaMONRegistryClient |
| 메모리 백엔드 | outbound/memory/{postgres,sqlite,inmemory}.py |
| 관측성 | observability/emitters/{log,broker}_emitter.py |
Registry 경로에서 모든 외부 의존성은 ComponentContainer 하나로 수렴합니다. LangGraphAgent.create(container, ...) 는 이 container만 가지고 그래프를 완성하며, Core는 Registry · MCP · Provider 를 직접 import하지 않습니다.
ExtensionConfig (선언) ▼create_component_container(config, registry, resolved, ...) ▼ComponentContainer ├─ llm LangChain BaseChatModel ├─ tools MCP tools + local_tools (extra_tools) ├─ system_prompt / skill_prompts (skill_id → prompt) ├─ input/output_guardrail GuardrailPort | None ├─ mcp_manager / a2a_manager / a2a_cards ├─ memory_manager 체크포인터 팩토리 포함 ├─ compiled_flow Flow 모드일 때만 └─ config 런타임 사본 (ExtensionConfig)비즈니스 노드 안이 아니라 어댑터 경계에서 적용됩니다. wrap 위치는 두 가지 경우로 갈립니다.
Case A) 사용자 RuntimeAdapter 가 LangGraphAgent 를 감싸는 경우 외부 GuardrailRuntimeAdapter (output 만) └─ 사용자 RuntimeAdapter.postprocess() └─ LangGraphAgent ├─ 내부 input_guardrail (LLM 호출 직전) ├─ LLM └─ 내부 output_guardrail (LLM 직후 raw)
Case B) 그 외 (Flow 그래프, 단순 RuntimeAdapter) 외부 GuardrailRuntimeAdapter (input + output) └─ wrapped agent이중 적용을 방지하기 위해 _inner_agent_has_container_guardrails() 가 내부에 같은 container의 가드레일이 이미 걸려 있는지 확인하고, 외부 wrap을 선택적으로 생략합니다.
Core는 trace를 발행한다는 사실만 알고, 어디로 전송되어 어떻게 저장되는지는 알지 못합니다.
LangGraphAgent └─ get_node_tracer() └─ emit_invoke_start / emit_routing / emit_guardrail / ... └─ TraceEmitterPort ← Protocol ├─ LogTraceEmitter 구조화 로그 └─ BrokerTraceEmitter Kafka 등
병행: start_langfuse_child_span() / end_… Langfuse Python SDK 직접 호출 (LANGFUSE_* env 있을 때만)입력 크기 제한 관련 trace (선택 기능): 입력 크기 제한을 켜면
(MAX_HISTORY_TOKENS 같은 환경변수) 관련 trace event 가 추가로 나옵니다 —
history.measured / history.truncated / input_cap.measured / input_cap.truncated.
이 환경변수를 설정하지 않으면(기본값) 이 event 는 하나도 발생하지 않습니다 —
기능을 켤 때만 보입니다.
invoke.end event 의 metadata 에는 측정값이 함께 실릴 수 있습니다 —
latency(단계별 소요 시간, STAGE_TIMING=on 일 때) 와 cache_stats(prompt cache 적중률).
이 값들은 trace event 에만 더해지고, 외부로 전송되는 기존 형식은 그대로 유지됩니다.
현재 설정값은 llamon config show 로 확인할 수 있습니다.
Studio는 LLM provider · MCP · A2A 어느 outbound 포트도 거치지 않습니다. nodes.py, graph.py, agent_card.py, config.py 를 읽고 쓰는 경로만 사용하므로 LangGraphAgent 가 호출되지 않습니다. Studio가 중단되어도 에이전트 서비스에는 영향이 없습니다.
React UI → Studio HTTP routes → parse/generate UseCase → 파일시스템LLM·MCP·A2A 같은 바깥 호출은 모두 core/http 의 전역 httpx 연결 풀 하나를 함께 씁니다. 이 풀이 동시에 열 수 있는 연결 수에는 상한이 있어서, 한꺼번에 너무 많은 요청이 몰리면 뒤따르는 호출이 자리를 기다리다 타임아웃으로 실패할 수 있습니다.
그래서 한 번에 여러 갈래로 퍼지는 지점 — 멀티 스킬 병렬 실행, 자식 에이전트 병렬 호출, 여러 prompt 를 동시에 검사하는 guardrail — 은 모두 동시 실행 개수에 기본 상한(DEFAULT_FANOUT_MAX_CONCURRENCY)을 두고 그 안에서만 돌립니다. 분기 수가 상한보다 많으면 초과분은 앞선 호출이 끝나는 대로 차례로 실행되므로, 결과는 빠짐없이 모두 모이되 풀이 고갈되지는 않습니다.
기본 상한으로 부족하거나 반대로 더 조이고 싶으면, 해당 호출부에서 max_concurrency 로 조정할 수 있습니다.
서버 기동
inbound/server/factory.py :: create_server
요청 실행
inbound/a2a/executor.py :: CustomAgentExecutor.execute
그래프 생성
core/runtime/agent.py :: LangGraphAgent.create
의존성 조립
inbound/server/bootstrap.py :: create_component_container