redis-py es el cliente oficial de Redis para Python. Redis es una base de datos en memoria que funciona como caché, broker de mensajes, almacén de sesiones y motor de estructuras de datos en tiempo real. Este tutorial cubre desde la conexión básica hasta patrones avanzados como Pub/Sub, sorted sets para rankings, pipelines y distributed locks, incluyendo el cliente asíncrono AsyncRedis.
Instalación y conexión
# pip install redis
import redis
# Conexión sencilla
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
r.ping() # True si Redis está disponible
# Usando URL
r = redis.from_url("redis://localhost:6379/0", decode_responses=True)
# Con pool de conexiones (recomendado en producción)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)
Antipatrón frecuente: crear un objeto Redis() en cada petición HTTP. Crea el pool una vez al arrancar la aplicación y reutilízalo.
Operaciones básicas
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Strings
r.set('nombre', 'Python')
r.set('contador', 0)
r.incr('contador') # 1
r.incrby('contador', 10) # 11
print(r.get('nombre')) # Python
# TTL (expiración automática)
r.set('sesion:abc123', 'datos_usuario', ex=3600) # expira en 1 hora
r.ttl('sesion:abc123') # segundos restantes
# Hashes
r.hset('usuario:42', mapping={'nombre': 'Ana', 'email': '[email protected]', 'rol': 'admin'})
print(r.hget('usuario:42', 'email')) # [email protected]
print(r.hgetall('usuario:42')) # {'nombre': 'Ana', ...}
# Listas
r.rpush('cola_emails', 'email1', 'email2', 'email3')
print(r.llen('cola_emails')) # 3
print(r.lpop('cola_emails')) # email1 (FIFO)
Patrón cache-aside
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def obtener_usuario(usuario_id: int) -> dict:
clave = f"usuario:{usuario_id}"
# 1. Buscar en caché
datos_cache = r.get(clave)
if datos_cache:
print(f"[CACHE HIT] usuario:{usuario_id}")
return json.loads(datos_cache)
# 2. No está en caché: buscar en BD
print(f"[CACHE MISS] usuario:{usuario_id} consultando BD")
time.sleep(0.1) # simula consulta SQL
usuario = {"id": usuario_id, "nombre": "Carlos", "email": "[email protected]"}
# 3. Guardar en caché con TTL de 5 minutos
r.setex(clave, 300, json.dumps(usuario))
return usuario
print(obtener_usuario(42)) # CACHE MISS
print(obtener_usuario(42)) # CACHE HIT
Pub/Sub: mensajes entre procesos
import redis
import threading
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def suscriptor(canal: str):
pubsub = r.pubsub()
pubsub.subscribe(canal)
print(f"Suscrito a '{canal}'")
for mensaje in pubsub.listen():
if mensaje['type'] == 'message':
print(f"Recibido en '{canal}': {mensaje['data']}")
# Suscriptor en hilo separado
hilo = threading.Thread(target=suscriptor, args=('noticias',), daemon=True)
hilo.start()
time.sleep(0.1)
# Publicar mensajes
r.publish('noticias', 'Primera noticia')
r.publish('noticias', 'Segunda noticia')
time.sleep(0.2)
Sorted sets: rankings en tiempo real
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
RANKING = 'ranking_juego'
# Añadir o actualizar puntuaciones
r.zadd(RANKING, {'jugador_ana': 9500, 'jugador_luis': 7200, 'jugador_maria': 11000})
r.zincrby(RANKING, 500, 'jugador_luis') # Luis suma 500 puntos
# Top 3 (mayor a menor)
top3 = r.zrevrange(RANKING, 0, 2, withscores=True)
print("Top 3:")
for posicion, (jugador, puntos) in enumerate(top3, 1):
print(f" {posicion}. {jugador}: {int(puntos)} pts")
# Posición de un jugador (0-indexed)
posicion = r.zrevrank(RANKING, 'jugador_luis')
print(f"Posición de Luis: {posicion + 1}") # 2
# Jugadores con más de 8000 puntos
buenos = r.zrangebyscore(RANKING, 8000, '+inf', withscores=True)
print(f"Jugadores > 8000 pts: {[j for j, _ in buenos]}")
Pipelines: reducir viajes de red
Sin pipeline, cada operación de Redis implica un viaje de red. Los pipelines agrupan varias operaciones y las envían en un único round-trip:
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Sin pipeline: 1000 viajes de red
inicio = time.perf_counter()
for i in range(1000):
r.set(f'clave:{i}', i * 2)
print(f"Sin pipeline: {time.perf_counter() - inicio:.2f}s")
# Con pipeline: 1 viaje de red (aprox)
inicio = time.perf_counter()
pipe = r.pipeline()
for i in range(1000):
pipe.set(f'clave:{i}', i * 2)
pipe.execute()
print(f"Con pipeline: {time.perf_counter() - inicio:.2f}s") # ~10x más rápido
# Pipeline como context manager
with r.pipeline() as pipe:
pipe.hset('stats', 'visitas', 0)
pipe.hset('stats', 'errores', 0)
pipe.expire('stats', 86400)
pipe.execute()
Distributed lock con SET NX EX
import redis
import uuid
import time
from contextlib import contextmanager
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
@contextmanager
def lock_distribuido(nombre: str, timeout: int = 30):
"""Lock distribuido para evitar ejecución concurrente en múltiples instancias."""
clave = f"lock:{nombre}"
token = str(uuid.uuid4())
adquirido = False
try:
adquirido = r.set(clave, token, nx=True, ex=timeout)
if not adquirido:
raise RuntimeError(f"No se pudo adquirir el lock '{nombre}'")
yield
finally:
if adquirido:
# Solo borra el lock si somos nosotros quienes lo tenemos
if r.get(clave) == token:
r.delete(clave)
# Uso
with lock_distribuido('generar_informe', timeout=60):
print("Ejecutando tarea exclusiva...")
time.sleep(1)
print("Tarea completada")
AsyncRedis: cliente asíncrono
import asyncio
import redis.asyncio as aioredis
async def main():
r = await aioredis.from_url("redis://localhost:6379/0", decode_responses=True)
await r.set('clave_async', 'valor_async', ex=60)
valor = await r.get('clave_async')
print(valor) # valor_async
# Pipeline async
async with r.pipeline() as pipe:
await pipe.set('a', 1)
await pipe.set('b', 2)
await pipe.execute()
await r.aclose()
asyncio.run(main())
Redis es una de las herramientas más versátiles de la arquitectura de aplicaciones Python. Dominar sus estructuras de datos y patrones como cache-aside, rate limiting y distributed locks convierte las aplicaciones lentas o con condiciones de carrera en sistemas robustos y escalables.
