Base de Datos¶
Capa de acceso a datos definida en database.py. Usa asyncpg con pool nativo async. Migrado de psycopg2 en VersionCimientos.
Pool de Conexiones¶
# database.py
pool = await asyncpg.create_pool(
dsn=DATABASE_URL,
min_size=ASYNCPG_MIN_SIZE, # default 2, configurable via ASYNCPG_MIN_SIZE
max_size=ASYNCPG_MAX_SIZE, # default 8, configurable via ASYNCPG_MAX_SIZE
command_timeout=ASYNCPG_COMMAND_TIMEOUT, # default 60s
init=_init_conn, # registra codecs JSON/JSONB/UUID
)
El pool se inicializa al arrancar la aplicacion en el lifespan de FastAPI (no en startup).
Multi-Tenant: schema_name¶
Regla Critica
Todas las funciones de BD reciben schema_name como parametro keyword-only (despues de *). Esto es obligatorio para evitar SQL injection y tenant leakage.
# CORRECTO
result = execute_query("SELECT ...", schema_name=schema_name)
# INCORRECTO - causa TypeError en runtime
result = execute_query("SELECT ...", schema_name)
Como funciona¶
- El
TenantMiddlewareextraeschema_namedel headerX-Tenant-Schema - El endpoint lo obtiene via
request.state.schema_nameoDepends(get_tenant_schema) - Se pasa a funciones de BD como
schema_name=schema_name get_conn()ejecutaSET LOCAL search_path TO "{schema}", publicdentro de una transaccion
Funciones Principales¶
get_conn¶
Context manager async que adquiere una conexion del pool con tenant context seguro.
@asynccontextmanager
async def get_conn(
*,
schema_name: str,
user_id: Optional[str] = None,
auth_source: Optional[str] = None,
):
"""
Adquiere conexion del pool con SET LOCAL search_path + GUC de auditoria.
Siempre abre transaccion: el search_path se revierte automaticamente al salir.
"""
Seguridad multi-tenant
Toda adquisicion abre SIEMPRE una transaccion y usa SET LOCAL search_path. Al COMMIT/ROLLBACK el search_path vuelve al default del pool. La conexion NUNCA regresa al pool contaminada con el tenant anterior.
with_tenant¶
Establece tenant context en una conexion que ya tiene una transaccion abierta. Usar cuando el caller pasa conn explicitamente.
async def with_tenant(
conn: asyncpg.Connection,
*,
schema_name: str,
user_id: Optional[str] = None,
auth_source: Optional[str] = None,
) -> None:
fetch_all¶
SELECT que devuelve multiples filas.
fetch_one¶
SELECT que devuelve una fila o None.
fetch_val¶
Escalar de la primera columna. Util para INSERT ... RETURNING id.
execute¶
INSERT/UPDATE/DELETE sin RETURNING. Devuelve el status (ej: 'UPDATE 1').
async def execute(
sql: str,
*params,
schema_name: str,
user_id: Optional[str] = None,
auth_source: Optional[str] = None,
) -> str:
transaction¶
Context manager async para transacciones atomicas multi-statement.
@asynccontextmanager
async def transaction(
*,
schema_name: str,
user_id: Optional[str] = None,
auth_source: Optional[str] = None,
):
Uso:
async with transaction(schema_name="200_muni", user_id=uid) as conn:
await conn.execute("INSERT INTO ...", a, b)
doc_id = await conn.fetchval("INSERT ... RETURNING id", c)
# COMMIT automatico al salir sin excepcion; ROLLBACK si excepcion
Validacion de Schema¶
Valida que el schema sea seguro para SQL:
- No vacio ni None
- Normaliza a minusculas y elimina espacios
- Solo letras minusculas, numeros y guion bajo (
^[a-z0-9_]+$) - Maximo 63 caracteres (limite PostgreSQL)
- Bloquea schemas reservados (
information_schema,pg_catalog,pg_toast) - Ejemplos validos:
"200_muni","public","municipio_abc"
Normalizacion
validate_schema_name llama a .strip().lower() antes de validar. El schema ingresado siempre se normaliza a minusculas en la capa de BD.
Codecs asyncpg¶
Al inicializar cada conexion del pool (_init_conn), se registran codecs automaticos:
json/jsonb— encode/decode conjson.dumps/json.loadsuuid— devuelve comostr(compatible con Pydanticstrfields, igual que psycopg2)
Conexion directa a PostgreSQL¶
El backend conecta directo a PostgreSQL (no PgBouncer). El pool asyncpg maneja sus propias conexiones. La deteccion de PgBouncer es por DB_PORT == 6432 pero no esta activada en produccion actual.
Funciones de Validacion¶
async def check_user_exists(user_id: str, *, schema_name: str) -> bool:
async def check_document_exists(document_id: str, *, schema_name: str) -> bool:
async def get_document_basic_info(document_id: str, *, schema_name: str) -> Optional[asyncpg.Record]:
Sin get_user_basic_info
get_user_basic_info() no existe en database.py. Los datos de usuario se obtienen via services/users/ o user_service.py.
Numeracion¶
La numeracion con advisory lock esta en shared/numbering.py, no en database.py.
- Documentos: advisory lock
888888 - Legajos (RLM): advisory lock
777777
Formato de numero de expediente: EE-{año}-{secuencia:06d}-{organizacion}-{departamento}
Contexto de Auditoria¶
Cuando se pasan user_id y auth_source a get_conn(), execute() o transaction(), se inyectan como GUC de PostgreSQL usando set_config(..., true) (scope transaccional):
SELECT set_config('app.user_id', 'uuid-del-usuario', true);
SELECT set_config('app.auth_source', 'jwt', true);
Esto permite que los triggers de auditoria en la BD registren quien hizo cada operacion. El GUC se revierte automaticamente al final de la transaccion (scope true = local a la transaccion).