Redis con Python (redis-py): caché, Pub/Sub, sorted sets, pipelines y patrones comunes

redis-py es el cliente oficial de Redis para Python. Redis es una base de datos en memoria que funciona como caché, broker de mensajes, almacén de sesiones y motor de estructuras de datos en tiempo real. Este tutorial cubre desde la conexión básica hasta patrones avanzados como Pub/Sub, sorted sets para rankings, pipelines y distributed locks, incluyendo el cliente asíncrono AsyncRedis.

Instalación y conexión

# pip install redis
import redis

# Conexión sencilla
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
r.ping()   # True si Redis está disponible

# Usando URL
r = redis.from_url("redis://localhost:6379/0", decode_responses=True)

# Con pool de conexiones (recomendado en producción)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)

Antipatrón frecuente: crear un objeto Redis() en cada petición HTTP. Crea el pool una vez al arrancar la aplicación y reutilízalo.

Operaciones básicas

import redis

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Strings
r.set('nombre', 'Python')
r.set('contador', 0)
r.incr('contador')          # 1
r.incrby('contador', 10)    # 11
print(r.get('nombre'))      # Python

# TTL (expiración automática)
r.set('sesion:abc123', 'datos_usuario', ex=3600)  # expira en 1 hora
r.ttl('sesion:abc123')   # segundos restantes

# Hashes
r.hset('usuario:42', mapping={'nombre': 'Ana', 'email': '[email protected]', 'rol': 'admin'})
print(r.hget('usuario:42', 'email'))    # [email protected]
print(r.hgetall('usuario:42'))          # {'nombre': 'Ana', ...}

# Listas
r.rpush('cola_emails', 'email1', 'email2', 'email3')
print(r.llen('cola_emails'))   # 3
print(r.lpop('cola_emails'))   # email1 (FIFO)

Patrón cache-aside

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

def obtener_usuario(usuario_id: int) -> dict:
    clave = f"usuario:{usuario_id}"

    # 1. Buscar en caché
    datos_cache = r.get(clave)
    if datos_cache:
        print(f"[CACHE HIT] usuario:{usuario_id}")
        return json.loads(datos_cache)

    # 2. No está en caché: buscar en BD
    print(f"[CACHE MISS] usuario:{usuario_id} — consultando BD")
    time.sleep(0.1)   # simula consulta SQL
    usuario = {"id": usuario_id, "nombre": "Carlos", "email": "[email protected]"}

    # 3. Guardar en caché con TTL de 5 minutos
    r.setex(clave, 300, json.dumps(usuario))
    return usuario


print(obtener_usuario(42))   # CACHE MISS
print(obtener_usuario(42))   # CACHE HIT

Pub/Sub: mensajes entre procesos

import redis
import threading
import time

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

def suscriptor(canal: str):
    pubsub = r.pubsub()
    pubsub.subscribe(canal)
    print(f"Suscrito a '{canal}'")
    for mensaje in pubsub.listen():
        if mensaje['type'] == 'message':
            print(f"Recibido en '{canal}': {mensaje['data']}")


# Suscriptor en hilo separado
hilo = threading.Thread(target=suscriptor, args=('noticias',), daemon=True)
hilo.start()

time.sleep(0.1)

# Publicar mensajes
r.publish('noticias', 'Primera noticia')
r.publish('noticias', 'Segunda noticia')
time.sleep(0.2)

Sorted sets: rankings en tiempo real

import redis

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

RANKING = 'ranking_juego'

# Añadir o actualizar puntuaciones
r.zadd(RANKING, {'jugador_ana': 9500, 'jugador_luis': 7200, 'jugador_maria': 11000})
r.zincrby(RANKING, 500, 'jugador_luis')   # Luis suma 500 puntos

# Top 3 (mayor a menor)
top3 = r.zrevrange(RANKING, 0, 2, withscores=True)
print("Top 3:")
for posicion, (jugador, puntos) in enumerate(top3, 1):
    print(f"  {posicion}. {jugador}: {int(puntos)} pts")

# Posición de un jugador (0-indexed)
posicion = r.zrevrank(RANKING, 'jugador_luis')
print(f"Posición de Luis: {posicion + 1}")   # 2

# Jugadores con más de 8000 puntos
buenos = r.zrangebyscore(RANKING, 8000, '+inf', withscores=True)
print(f"Jugadores > 8000 pts: {[j for j, _ in buenos]}")

Pipelines: reducir viajes de red

Sin pipeline, cada operación de Redis implica un viaje de red. Los pipelines agrupan varias operaciones y las envían en un único round-trip:

import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Sin pipeline: 1000 viajes de red
inicio = time.perf_counter()
for i in range(1000):
    r.set(f'clave:{i}', i * 2)
print(f"Sin pipeline: {time.perf_counter() - inicio:.2f}s")

# Con pipeline: 1 viaje de red (aprox)
inicio = time.perf_counter()
pipe = r.pipeline()
for i in range(1000):
    pipe.set(f'clave:{i}', i * 2)
pipe.execute()
print(f"Con pipeline: {time.perf_counter() - inicio:.2f}s")   # ~10x más rápido

# Pipeline como context manager
with r.pipeline() as pipe:
    pipe.hset('stats', 'visitas', 0)
    pipe.hset('stats', 'errores', 0)
    pipe.expire('stats', 86400)
    pipe.execute()

Distributed lock con SET NX EX

import redis
import uuid
import time
from contextlib import contextmanager

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

@contextmanager
def lock_distribuido(nombre: str, timeout: int = 30):
    """Lock distribuido para evitar ejecución concurrente en múltiples instancias."""
    clave = f"lock:{nombre}"
    token = str(uuid.uuid4())
    adquirido = False
    try:
        adquirido = r.set(clave, token, nx=True, ex=timeout)
        if not adquirido:
            raise RuntimeError(f"No se pudo adquirir el lock '{nombre}'")
        yield
    finally:
        if adquirido:
            # Solo borra el lock si somos nosotros quienes lo tenemos
            if r.get(clave) == token:
                r.delete(clave)


# Uso
with lock_distribuido('generar_informe', timeout=60):
    print("Ejecutando tarea exclusiva...")
    time.sleep(1)
    print("Tarea completada")

AsyncRedis: cliente asíncrono

import asyncio
import redis.asyncio as aioredis

async def main():
    r = await aioredis.from_url("redis://localhost:6379/0", decode_responses=True)

    await r.set('clave_async', 'valor_async', ex=60)
    valor = await r.get('clave_async')
    print(valor)   # valor_async

    # Pipeline async
    async with r.pipeline() as pipe:
        await pipe.set('a', 1)
        await pipe.set('b', 2)
        await pipe.execute()

    await r.aclose()

asyncio.run(main())

Redis es una de las herramientas más versátiles de la arquitectura de aplicaciones Python. Dominar sus estructuras de datos y patrones como cache-aside, rate limiting y distributed locks convierte las aplicaciones lentas o con condiciones de carrera en sistemas robustos y escalables.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP