SQLx en Rust: queries SQL async tipadas en compilación, pool y migraciones

Cuando escribes una query SQL en un string, el compilador de Rust no sabe si es válida. Un error tipográfico en el nombre de una columna, un tipo incorrecto en los parámetros o una tabla que no existe: ninguno de esos errores se detecta hasta que la aplicación está en ejecución y la query falla.

SQLx resuelve esto verificando tus queries contra la base de datos real en tiempo de compilación. No es un ORM: las queries se escriben en SQL puro, pero el compilador confirma que son correctas y que los tipos Rust coinciden con los tipos de la base de datos.

Añadir SQLx al proyecto

[dependencies]
sqlx = { version = "0.7", features = ["runtime-tokio", "postgres", "macros"] }
tokio = { version = "1", features = ["full"] }

SQLx admite PostgreSQL, MySQL y SQLite. Para MySQL usa el feature "mysql" en lugar de "postgres". El feature "macros" activa las macros de compilación (query!, query_as!).

Conectar con PgPool

En aplicaciones async con múltiples peticiones concurrentes, no debes usar una sola conexión: usa un pool. PgPool gestiona un conjunto de conexiones que se reutilizan entre peticiones:

use sqlx::PgPool;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let database_url = std::env::var("DATABASE_URL")
        .expect("DATABASE_URL debe estar definida");

    let pool = PgPool::connect(&database_url).await?;

    // Verificar que la conexión funciona
    let fila: (i64,) = sqlx::query_as("SELECT 1")
        .fetch_one(&pool)
        .await?;

    println!("Conexión OK: {}", fila.0);
    Ok(())
}

Para configurar el pool más en detalle, usa PgPoolOptions:

use sqlx::postgres::PgPoolOptions;

let pool = PgPoolOptions::new()
    .max_connections(10)
    .min_connections(2)
    .acquire_timeout(std::time::Duration::from_secs(5))
    .connect(&database_url)
    .await?;

query_as!: mapear filas a structs

La macro query_as! ejecuta una query y mapea cada fila a una struct. Lo hace verificando en compilación que los tipos Rust coinciden con los tipos de la columna en la base de datos:

use sqlx::FromRow;

#[derive(Debug, FromRow)]
struct Usuario {
    id: i64,
    nombre: String,
    email: String,
    creado_en: chrono::DateTime<chrono::Utc>,
}

async fn obtener_usuario(
    pool: &PgPool,
    id: i64,
) -> Result<Option<Usuario>, sqlx::Error> {
    let usuario = sqlx::query_as!(
        Usuario,
        "SELECT id, nombre, email, creado_en FROM usuarios WHERE id = $1",
        id
    )
    .fetch_optional(pool)
    .await?;

    Ok(usuario)
}

async fn listar_usuarios(pool: &PgPool) -> Result<Vec<Usuario>, sqlx::Error> {
    let usuarios = sqlx::query_as!(
        Usuario,
        "SELECT id, nombre, email, creado_en FROM usuarios ORDER BY creado_en DESC"
    )
    .fetch_all(pool)
    .await?;

    Ok(usuarios)
}

El sufijo del método de ejecución indica cuántas filas esperas: fetch_one devuelve una fila o error, fetch_optional devuelve Option, fetch_all devuelve un vector y fetch devuelve un stream.

Inserts, updates y deletes

Para operaciones que no devuelven filas, usa la macro query! en lugar de query_as!:

async fn crear_usuario(
    pool: &PgPool,
    nombre: &str,
    email: &str,
) -> Result<i64, sqlx::Error> {
    let resultado = sqlx::query!(
        "INSERT INTO usuarios (nombre, email) VALUES ($1, $2) RETURNING id",
        nombre,
        email
    )
    .fetch_one(pool)
    .await?;

    Ok(resultado.id)
}

async fn actualizar_email(
    pool: &PgPool,
    id: i64,
    nuevo_email: &str,
) -> Result<bool>, sqlx::Error> {
    let resultado = sqlx::query!(
        "UPDATE usuarios SET email = $1 WHERE id = $2",
        nuevo_email,
        id
    )
    .execute(pool)
    .await?;

    Ok(resultado.rows_affected() > 0)
}

async fn eliminar_usuario(pool: &PgPool, id: i64) -> Result<bool, sqlx::Error> {
    let resultado = sqlx::query!("DELETE FROM usuarios WHERE id = $1", id)
        .execute(pool)
        .await?;

    Ok(resultado.rows_affected() > 0)
}

Transacciones

Para operaciones que deben ser atómicas, SQLx tiene soporte nativo de transacciones:

async fn transferir_fondos(
    pool: &PgPool,
    de_cuenta: i64,
    a_cuenta: i64,
    cantidad: i64,
) -> Result<(), sqlx::Error> {
    let mut tx = pool.begin().await?;

    sqlx::query!(
        "UPDATE cuentas SET saldo = saldo - $1 WHERE id = $2",
        cantidad, de_cuenta
    )
    .execute(&mut *tx)
    .await?;

    sqlx::query!(
        "UPDATE cuentas SET saldo = saldo + $1 WHERE id = $2",
        cantidad, a_cuenta
    )
    .execute(&mut *tx)
    .await?;

    tx.commit().await?;
    Ok(())
}

Si cualquier operación falla, el ? propaga el error y al salir del scope la transacción hace rollback automáticamente si no se llamó a commit().

Migraciones con sqlx-cli

SQLx incluye una herramienta de línea de comandos para gestionar migraciones:

cargo install sqlx-cli --no-default-features --features postgres

Con la variable de entorno DATABASE_URL apuntando a tu base de datos:

# Crear la base de datos
sqlx database create

# Crear una nueva migración
sqlx migrate add crear_tabla_usuarios

# Esto crea migrations/20240101000000_crear_tabla_usuarios.sql
# Escribe tu SQL en ese archivo:

CREATE TABLE usuarios (
    id BIGSERIAL PRIMARY KEY,
    nombre VARCHAR(255) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    creado_en TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

# Aplicar las migraciones pendientes
sqlx migrate run

Para ejecutar las migraciones automáticamente al arrancar la aplicación:

sqlx::migrate!("./migrations").run(&pool).await?;

Verificación en tiempo de compilación sin base de datos

La verificación de query! y query_as! en compilación requiere acceso a la base de datos. En CI o en máquinas sin PostgreSQL, SQLx puede usar un cache de metadatos generado previamente:

# Generar el cache (con la DB disponible)
cargo sqlx prepare

# Esto crea .sqlx/query-*.json
# Commítealos junto con el código

# Ahora cargo build funciona sin conexión a la DB
SQLX_OFFLINE=true cargo build

El workflow habitual es: generar el cache localmente con cargo sqlx prepare, comitear los archivos .sqlx/ y en CI usar SQLX_OFFLINE=true.

Integración con Axum

La forma estándar de combinar SQLx con Axum es pasar el pool como estado:

use axum::{extract::State, Router, routing::get};
use sqlx::PgPool;

#[derive(Clone)]
struct AppState {
    db: PgPool,
}

async fn listar_usuarios(
    State(state): State<AppState>
) -> Result<Json<Vec<Usuario>>, StatusCode> {
    listar(&state.db)
        .await
        .map(Json)
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
    sqlx::migrate!("./migrations").run(&pool).await?;

    let state = AppState { db: pool };
    let app = Router::new()
        .route("/usuarios", get(listar_usuarios))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    axum::serve(listener, app).await?;
    Ok(())
}

Esta integración es directa porque PgPool implementa Clone (clonar un pool comparte las conexiones, no las duplica) y Send + Sync, que es lo que Axum requiere del estado.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP