콘텐츠로 이동

PostgreSQL 공유 풀 (asyncpg)

postgres 노드는 호출마다 새 연결을 엽니다 — 프로토타입에서는 충분하지만, 매번 TCP+TLS 핸드셰이크 비용이 들고 동시 요청이 늘면 DB 의 max_connections 한도에 금방 부딪힙니다.

공유 풀은 세 가지로 더 빠르고 안전합니다.

  • 연결 재사용 — 핸드셰이크 한 번이면 끝, 이후 latency 가 낮아짐
  • max_size 상한 — 트래픽 폭주에도 DB 측 연결 수가 예측 범위 안
  • 단일 생명주기 — SDK lifespan 종료 시 모든 풀이 자동으로 정리되어 누수 없음

같은 DB 를 두 번 이상 쓰거나 요청이 꾸준한 운영 환경이라면 기본 패턴으로 사용하세요.

app/graph.py
from llamon_agent.config import RuntimeEnv, Settings
from llamon_agent.core.postgres import PostgresPoolConfig, register_pg_pool
from llamon_agent.graph import END, START, GraphBuilder
from app.nodes import my_node
async def build_graph(settings: Settings):
env = RuntimeEnv(source_file=__file__)
graph = (
GraphBuilder()
.node("my_node", my_node, node_kind="business")
.edge(START, "my_node").edge("my_node", END)
.build()
)
await register_pg_pool(
name="lookup_db",
config=PostgresPoolConfig(dsn=env.get("POSTGRES_LOOKUP_DSN", "")),
required=False,
)
return graph

__llamon_shutdown__ hook 을 따로 작성하지 않아도 됩니다 — SDK lifespan 이 close_all_pg_pools() 를 자동으로 호출합니다.

app/nodes.py
from llamon_agent.core.postgres import get_pg_pool
async def my_node(state):
pool = get_pg_pool("lookup_db")
if pool is None:
# DSN 미설정 또는 init 실패 — graceful degrade
return {"messages": [], "output": {"output_data": []}}
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, name FROM my_table WHERE code = ANY($1::text[])",
state["codes"],
)
return {"output": {"output_data": [dict(r) for r in rows]}}

같은 그래프에서 lookup DB / audit DB 처럼 서로 다른 풀이 필요하면, 이름만 다르게 해서 여러 번 등록하면 됩니다.

await register_pg_pool(
name="lookup_db",
config=PostgresPoolConfig(dsn=env.get("POSTGRES_LOOKUP_DSN", "")),
required=False, # 읽기 전용 — 실패 시 graceful degrade
)
await register_pg_pool(
name="audit_db",
config=PostgresPoolConfig(
dsn=env.get("POSTGRES_AUDIT_DSN", ""),
max_size=10, # 쓰기 부하 대비 더 큰 풀
command_timeout=10.0,
),
required=True, # 쓰기 — DB 없으면 부팅을 중단
)

graceful degrade vs strict — 옵션 한 줄로

섹션 제목: “graceful degrade vs strict — 옵션 한 줄로”
정책옵션동작
graceful degraderequired=False (기본)DSN 빈값 / init 실패 시 경고 로그만 남기고, get_pg_pool(name) 이 None 을 반환. 노드는 빈 결과로 응답을 이어 만듭니다.
strictrequired=TrueDSN 빈값 / init 실패 시 예외를 그대로 던져 부팅을 막습니다. DB 없이는 응답이 의미가 없을 때 선택하세요.

잘못 적용한 graceful degrade 는 silent fail 의 원인입니다 — 비즈니스 로직이 DB 없이 동작할 수 없다면 반드시 required=True 로 두세요.

필드기본의미
dsn(필수)postgresql://user:pass@host:port/db. 빈 문자열일 때는 required 옵션에 따라 처리됩니다.
min_size1풀 최소 connection 수. PgBouncer 환경에서는 idle 슬롯 점유를 피하기 위해 0 을 권장.
max_size5풀 최대 connection 수. 단일 에이전트의 동시 요청 5개 이상에 대응.
command_timeout3.0쿼리 타임아웃 (초). DB 가 정체될 때 즉시 실패.
connect_timeout15.0연결 수립 타임아웃 (초). asyncpg 기본값(60초)을 운영용으로 줄인 값.

단위 테스트에서 실제 DB 없이 가짜 풀을 주입하려면 다음 패턴을 따르면 됩니다.

from llamon_agent.core.postgres import (
PostgresPoolRegistry,
set_provider,
)
def test_my_node(monkeypatch):
# 1) asyncpg.create_pool 을 가짜로 교체
import asyncpg
fake_pool = ... # MagicMock + AsyncMock 조합
monkeypatch.setattr(asyncpg, "create_pool", AsyncMock(return_value=fake_pool))
# 2) 격리된 registry 주입
previous = set_provider(PostgresPoolRegistry())
try:
# ... 실제 테스트 ...
finally:
set_provider(previous) # 다른 테스트에 영향이 새지 않도록 복원

HttpClientPoolset_provider 와 같은 형태의 헥사고날 테스트 교체 지점입니다.

두 flow 가 helper 패턴으로 구현되어 있습니다 — app/graph.py 에서 register_pg_pool 을 한 번 호출하고, app/internals/*.py 의 쿼리 함수가 get_pg_pool 로 핸들을 받습니다. 새 프로젝트는 이 구조를 그대로 가져다 쓸 수 있습니다.

  • income-asset-comp-flow — 단일 풀, ANY($1::text[]) 배치 쿼리, trace 로깅 (missing_codes 를 별도로 노출)

    • app/graph.pyregister_pg_pool(name=LOOKUP_POOL, ...) 1회 호출
    • app/internals/db.pyfetch_submission_documents_batch 만 가짐 (풀 생명주기 코드 없음)
  • somyung-document-review-flow — composer flow 안에서 풀 사용, 두 쿼리를 분리

    • app/graph.pyregister_pg_pool(name=LOOKUP_POOL, ...) 1회 호출 (registry resolve 와 같은 함수 안)
    • app/internals/lookup.pyfetch_incpr_name + fetch_recommended_doc_names

두 flow 모두 internals/__init__.py 에서 LOOKUP_POOL 상수를 노출합니다 — graph.py 의 등록과 쿼리 함수의 get_pg_pool 이 같은 이름을 쓰도록 맞춘 패턴입니다.