El patrón Unit of Work se resume en una idea: agrupar varias operaciones de base de datos en una única transacción lógica. Si todas van bien, se confirman juntas. Si algo falla en medio, se deshace todo lo que se había hecho hasta ese momento.
La gracia está en que tu código de negocio no tiene que preocuparse por gestionar confirmaciones y rollbacks línea a línea. Delegas esa responsabilidad en un objeto que lleva la cuenta de los cambios pendientes y los aplica (o los cancela) de golpe cuando toca.
Este patrón aparece en sitios que probablemente ya conoces. La Session de SQLAlchemy es su implementación más conocida en Python. El ORM de Django también lo aplica de forma implícita cuando trabajas dentro de un bloque atómico. Y si construyes repositorios propios sin ORM, puedes implementarlo tú mismo con unas pocas líneas.
El problema que resuelve
Imagina que tienes que crear un pedido. La operación implica insertar una fila en la tabla pedidos, varias filas en lineas_pedido y actualizar el stock en productos. Tres tablas, tres operaciones distintas.
¿Qué pasa si la inserción en lineas_pedido va bien pero la actualización de stock falla por un error de red, un valor nulo inesperado o una restricción que no habías contemplado? Sin una transacción que lo envuelva todo, te quedas con un pedido creado, líneas guardadas y el stock sin tocar. Datos inconsistentes, pedido a medias.
Con Unit of Work ese escenario no existe. O confirmas las tres operaciones juntas, o no confirmas ninguna. El estado de la base de datos siempre queda coherente.
Implementación manual con psycopg2
Si trabajas directamente con psycopg2 sin ORM, el control transaccional es explícito. Por defecto, psycopg2 abre una transacción en la primera consulta. Lo que debes hacer es desactivar el autocommit, ejecutar tus operaciones y confirmar o deshacer según el resultado.
import psycopg2
def crear_pedido(datos_pedido, lineas):
conn = psycopg2.connect(dsn="postgresql://usuario:password@localhost/tienda")
conn.autocommit = False # ya es False por defecto, pero lo dejamos explícito
try:
cur = conn.cursor()
cur.execute(
"INSERT INTO pedidos (cliente_id, total) VALUES (%s, %s) RETURNING id",
(datos_pedido['cliente_id'], datos_pedido['total'])
)
pedido_id = cur.fetchone()[0]
for linea in lineas:
cur.execute(
"INSERT INTO lineas_pedido (pedido_id, producto_id, cantidad, precio) VALUES (%s, %s, %s, %s)",
(pedido_id, linea['producto_id'], linea['cantidad'], linea['precio'])
)
cur.execute(
"UPDATE productos SET stock = stock - %s WHERE id = %s",
(linea['cantidad'], linea['producto_id'])
)
conn.commit()
print(f"Pedido {pedido_id} creado correctamente.")
return pedido_id
except Exception as e:
conn.rollback()
print(f"Error al crear el pedido: {e}")
raise
finally:
cur.close()
conn.close()
El bloque try/except/finally es el corazón del asunto. Si cualquier consulta lanza una excepción, el except llama a rollback() y la base de datos vuelve al estado anterior. El finally cierra la conexión pase lo que pase.
Unit of Work con SQLAlchemy
SQLAlchemy lleva el patrón integrado en su objeto Session. La sesión actúa como un contenedor de cambios pendientes: vas añadiendo objetos con session.add(), los cambios se acumulan en memoria y no se envían a la base de datos hasta que llamas a session.flush() o session.commit().
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
engine = create_engine("postgresql+psycopg2://usuario:password@localhost/tienda")
def crear_pedido(datos_pedido, lineas):
with Session(engine) as session:
try:
pedido = Pedido(
cliente_id=datos_pedido['cliente_id'],
total=datos_pedido['total']
)
session.add(pedido)
session.flush() # genera el id sin confirmar aún
for linea in lineas:
linea_obj = LineaPedido(
pedido_id=pedido.id,
producto_id=linea['producto_id'],
cantidad=linea['cantidad'],
precio=linea['precio']
)
session.add(linea_obj)
producto = session.get(Producto, linea['producto_id'])
producto.stock -= linea['cantidad']
session.commit()
return pedido.id
except Exception as e:
session.rollback()
raise
Un detalle que vale la pena entender: SQLAlchemy mantiene un identity map dentro de la sesión. Eso significa que si cargas el mismo Producto dos veces durante la misma sesión, la segunda llamada no va a la base de datos: devuelve la instancia que ya tiene en memoria. Esto evita lecturas duplicadas y mantiene la coherencia del objeto durante toda la operación.
También puedes usar el context manager con Session.begin() para que el commit y el rollback sean automáticos:
with Session(engine).begin() as session:
# si no hay excepción, hace commit al salir
# si hay excepción, hace rollback automáticamente
session.add(pedido)
Implementación propia desde cero
Si no usas SQLAlchemy y quieres aplicar el patrón con psycopg2 de forma más estructurada, puedes construir tu propia clase UnitOfWork. La idea es que los repositorios compartan la misma conexión y que el objeto UnitOfWork controle cuándo se confirman o descartan los cambios.
import psycopg2
class UnitOfWork:
def __init__(self, dsn):
self.dsn = dsn
self.conn = None
def __enter__(self):
self.conn = psycopg2.connect(self.dsn)
self.conn.autocommit = False
self.pedidos = PedidoRepository(self.conn)
self.lineas = LineaRepository(self.conn)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.conn.commit()
else:
self.conn.rollback()
self.conn.close()
class PedidoRepository:
def __init__(self, conn):
self.conn = conn
def crear(self, cliente_id, total):
cur = self.conn.cursor()
cur.execute(
"INSERT INTO pedidos (cliente_id, total) VALUES (%s, %s) RETURNING id",
(cliente_id, total)
)
return cur.fetchone()[0]
class LineaRepository:
def __init__(self, conn):
self.conn = conn
def crear(self, pedido_id, producto_id, cantidad, precio):
cur = self.conn.cursor()
cur.execute(
"INSERT INTO lineas_pedido (pedido_id, producto_id, cantidad, precio) VALUES (%s, %s, %s, %s)",
(pedido_id, producto_id, cantidad, precio)
)
DSN = "postgresql://usuario:password@localhost/tienda"
def crear_pedido(datos_pedido, lineas):
with UnitOfWork(DSN) as uow:
pedido_id = uow.pedidos.crear(datos_pedido['cliente_id'], datos_pedido['total'])
for linea in lineas:
uow.lineas.crear(pedido_id, linea['producto_id'], linea['cantidad'], linea['precio'])
return pedido_id
La función crear_pedido queda limpia: trabaja con repositorios sin tocar SQL directamente ni pensar en transacciones. El __exit__ del UnitOfWork se encarga de confirmar si todo fue bien o de hacer rollback si saltó alguna excepción.
Cuándo merece la pena y cuándo no
El patrón tiene sentido cuando varias operaciones deben ejecutarse juntas o no ejecutarse. Si estás modificando más de una tabla en una misma operación de negocio y la coherencia entre ellas es crítica, Unit of Work es lo que necesitas.
- Crear un pedido con sus líneas y actualizar stock.
- Registrar un pago y actualizar el estado de la factura.
- Crear un usuario y asignarle roles iniciales en tablas distintas.
En cambio, para operaciones simples sobre una sola tabla no vale la pena añadir esta capa. Un script que inserta registros de un CSV en una tabla, o una consulta de lectura, no necesitan Unit of Work. Añadirlo ahí solo complica el código sin aportar nada.
Cómo testear el Unit of Work
Una ventaja que a veces se pasa por alto: el rollback es perfecto como mecanismo de limpieza en tests. En lugar de borrar registros al final de cada test, abres una transacción, ejecutas el test dentro y luego haces rollback. La base de datos queda exactamente como estaba.
import pytest
import psycopg2
@pytest.fixture
def conn():
connection = psycopg2.connect(dsn="postgresql://usuario:password@localhost/tienda_test")
connection.autocommit = False
yield connection
connection.rollback() # limpieza automática al salir del test
connection.close()
def test_crear_pedido(conn):
cur = conn.cursor()
cur.execute("INSERT INTO pedidos (cliente_id, total) VALUES (1, 99.99) RETURNING id")
pedido_id = cur.fetchone()[0]
assert pedido_id is not None
# al salir del test, el fixture hace rollback: no queda rastro en la BBDD
Si prefieres no depender de una base de datos real en los tests, SQLite en memoria es una buena alternativa para tests unitarios rápidos. SQLAlchemy la soporta sin configuración adicional:
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine) # crea las tablas en memoria
def test_crear_pedido_sqlalchemy():
with Session(engine) as session:
pedido = Pedido(cliente_id=1, total=99.99)
session.add(pedido)
session.commit()
assert pedido.id is not None
Los tests sobre SQLite en memoria son rápidos porque no hay disco de por medio. La contrapartida es que SQLite tiene algunas diferencias con PostgreSQL (tipos de datos, restricciones), así que para tests de integración conviene usar la base de datos real.
Si quieres profundizar en otros usos de Python más allá de la gestión de datos, puedes ver cómo se aplica en automatización con Python o en entornos de Python científico.
Imagen: Pexels / Seraphfim Gallery
