PostgreSQL 공유 풀 (asyncpg)
postgres 노드는 호출마다 새 연결을 엽니다 — 프로토타입에서는 충분하지만, 매번 TCP+TLS 핸드셰이크 비용이 들고 동시 요청이 늘면 DB 의 max_connections 한도에 금방 부딪힙니다.
공유 풀은 세 가지로 더 빠르고 안전합니다.
- 연결 재사용 — 핸드셰이크 한 번이면 끝, 이후 latency 가 낮아짐
max_size상한 — 트래픽 폭주에도 DB 측 연결 수가 예측 범위 안- 단일 생명주기 — SDK lifespan 종료 시 모든 풀이 자동으로 정리되어 누수 없음
같은 DB 를 두 번 이상 쓰거나 요청이 꾸준한 운영 환경이라면 기본 패턴으로 사용하세요.
사용 — 2단계
섹션 제목: “사용 — 2단계”1) build_graph() 에서 등록
섹션 제목: “1) build_graph() 에서 등록”from llamon_agent.config import RuntimeEnv, Settingsfrom llamon_agent.core.postgres import PostgresPoolConfig, register_pg_poolfrom llamon_agent.graph import END, START, GraphBuilder
from app.nodes import my_node
async def build_graph(settings: Settings): env = RuntimeEnv(source_file=__file__)
graph = ( GraphBuilder() .node("my_node", my_node, node_kind="business") .edge(START, "my_node").edge("my_node", END) .build() )
await register_pg_pool( name="lookup_db", config=PostgresPoolConfig(dsn=env.get("POSTGRES_LOOKUP_DSN", "")), required=False, ) return graph__llamon_shutdown__ hook 을 따로 작성하지 않아도 됩니다 — SDK lifespan 이 close_all_pg_pools() 를 자동으로 호출합니다.
2) 노드 / 쿼리 함수에서 사용
섹션 제목: “2) 노드 / 쿼리 함수에서 사용”from llamon_agent.core.postgres import get_pg_pool
async def my_node(state): pool = get_pg_pool("lookup_db") if pool is None: # DSN 미설정 또는 init 실패 — graceful degrade return {"messages": [], "output": {"output_data": []}}
async with pool.acquire() as conn: rows = await conn.fetch( "SELECT id, name FROM my_table WHERE code = ANY($1::text[])", state["codes"], ) return {"output": {"output_data": [dict(r) for r in rows]}}여러 DB 풀
섹션 제목: “여러 DB 풀”같은 그래프에서 lookup DB / audit DB 처럼 서로 다른 풀이 필요하면, 이름만 다르게 해서 여러 번 등록하면 됩니다.
await register_pg_pool( name="lookup_db", config=PostgresPoolConfig(dsn=env.get("POSTGRES_LOOKUP_DSN", "")), required=False, # 읽기 전용 — 실패 시 graceful degrade)await register_pg_pool( name="audit_db", config=PostgresPoolConfig( dsn=env.get("POSTGRES_AUDIT_DSN", ""), max_size=10, # 쓰기 부하 대비 더 큰 풀 command_timeout=10.0, ), required=True, # 쓰기 — DB 없으면 부팅을 중단)graceful degrade vs strict — 옵션 한 줄로
섹션 제목: “graceful degrade vs strict — 옵션 한 줄로”| 정책 | 옵션 | 동작 |
|---|---|---|
| graceful degrade | required=False (기본) | DSN 빈값 / init 실패 시 경고 로그만 남기고, get_pg_pool(name) 이 None 을 반환. 노드는 빈 결과로 응답을 이어 만듭니다. |
| strict | required=True | DSN 빈값 / init 실패 시 예외를 그대로 던져 부팅을 막습니다. DB 없이는 응답이 의미가 없을 때 선택하세요. |
잘못 적용한 graceful degrade 는 silent fail 의 원인입니다 — 비즈니스 로직이 DB 없이 동작할 수 없다면 반드시 required=True 로 두세요.
PostgresPoolConfig 옵션
섹션 제목: “PostgresPoolConfig 옵션”| 필드 | 기본 | 의미 |
|---|---|---|
dsn | (필수) | postgresql://user:pass@host:port/db. 빈 문자열일 때는 required 옵션에 따라 처리됩니다. |
min_size | 1 | 풀 최소 connection 수. PgBouncer 환경에서는 idle 슬롯 점유를 피하기 위해 0 을 권장. |
max_size | 5 | 풀 최대 connection 수. 단일 에이전트의 동시 요청 5개 이상에 대응. |
command_timeout | 3.0 | 쿼리 타임아웃 (초). DB 가 정체될 때 즉시 실패. |
connect_timeout | 15.0 | 연결 수립 타임아웃 (초). asyncpg 기본값(60초)을 운영용으로 줄인 값. |
테스트 — set_provider 로 교체
섹션 제목: “테스트 — set_provider 로 교체”단위 테스트에서 실제 DB 없이 가짜 풀을 주입하려면 다음 패턴을 따르면 됩니다.
from llamon_agent.core.postgres import ( PostgresPoolRegistry, set_provider,)
def test_my_node(monkeypatch): # 1) asyncpg.create_pool 을 가짜로 교체 import asyncpg fake_pool = ... # MagicMock + AsyncMock 조합 monkeypatch.setattr(asyncpg, "create_pool", AsyncMock(return_value=fake_pool))
# 2) 격리된 registry 주입 previous = set_provider(PostgresPoolRegistry()) try: # ... 실제 테스트 ... finally: set_provider(previous) # 다른 테스트에 영향이 새지 않도록 복원HttpClientPool 의 set_provider 와 같은 형태의 헥사고날 테스트 교체 지점입니다.
참고 구현
섹션 제목: “참고 구현”두 flow 가 helper 패턴으로 구현되어 있습니다 — app/graph.py 에서 register_pg_pool 을 한 번 호출하고, app/internals/*.py 의 쿼리 함수가 get_pg_pool 로 핸들을 받습니다. 새 프로젝트는 이 구조를 그대로 가져다 쓸 수 있습니다.
-
income-asset-comp-flow— 단일 풀,ANY($1::text[])배치 쿼리, trace 로깅 (missing_codes를 별도로 노출)app/graph.py—register_pg_pool(name=LOOKUP_POOL, ...)1회 호출app/internals/db.py—fetch_submission_documents_batch만 가짐 (풀 생명주기 코드 없음)
-
somyung-document-review-flow— composer flow 안에서 풀 사용, 두 쿼리를 분리app/graph.py—register_pg_pool(name=LOOKUP_POOL, ...)1회 호출 (registry resolve 와 같은 함수 안)app/internals/lookup.py—fetch_incpr_name+fetch_recommended_doc_names
두 flow 모두 internals/__init__.py 에서 LOOKUP_POOL 상수를 노출합니다 — graph.py 의 등록과 쿼리 함수의 get_pg_pool 이 같은 이름을 쓰도록 맞춘 패턴입니다.