Tokio avanzado en Rust: select!, join!, spawn_blocking, tokio::sync y cancellation tokens

Tokio es el runtime async más usado en Rust, pero su uso avanzado va mucho más allá de tokio::main y spawn. Coordinar múltiples futures con select! y join!, delegar trabajo CPU-intensivo con spawn_blocking, compartir estado con las primitivas de tokio::sync y propagar cancelaciones con CancellationToken son las herramientas que distinguen un uso básico de uno profesional.

select!: ejecutar el primer future que complete

tokio::select! ejecuta varios futures concurrentemente y devuelve el resultado del primero en completar, cancelando los demás. Es el mecanismo principal para implementar timeouts, carreras entre operaciones y escape de bucles de espera.

use tokio::time::{sleep, Duration, timeout};
use tokio::sync::oneshot;

async fn operacion_lenta() -> String {
    sleep(Duration::from_millis(200)).await;
    "resultado de la operación".into()
}

async fn fallback() -> String {
    sleep(Duration::from_millis(100)).await;
    "fallback (más rápido)".into()
}

#[tokio::main]
async fn main() {
    // select!: gana el primero
    let ganador = tokio::select! {
        r = operacion_lenta() => format!("operación: {r}"),
        r = fallback()        => format!("fallback: {r}"),
    };
    println!("{ganador}"); // fallback: fallback (más rápido)

    // timeout como sugar sobre select!
    match timeout(Duration::from_millis(50), operacion_lenta()).await {
        Ok(r)  => println!("completado: {r}"),
        Err(_) => println!("timeout después de 50ms"),
    }

    // Cancelación con canal oneshot
    let (tx, mut rx) = oneshot::channel::<()>();
    let handle = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = &mut rx => { println!("tarea cancelada"); break; }
                _ = sleep(Duration::from_millis(20)) => {
                    println!("tick...");
                }
            }
        }
    });
    sleep(Duration::from_millis(70)).await;
    let _ = tx.send(());
    handle.await.unwrap();
}

join! y try_join!: ejecutar futures en paralelo

tokio::join! ejecuta todos los futures concurrentemente y espera a que todos completen. try_join! hace lo mismo pero cortocircuita si alguno devuelve Err.

use tokio::time::{sleep, Duration};

async fn fetch_usuarios() -> Vec<String> {
    sleep(Duration::from_millis(100)).await;
    vec!["Alice".into(), "Bob".into()]
}

async fn fetch_pedidos() -> Vec<u32> {
    sleep(Duration::from_millis(80)).await;
    vec![1, 2, 3]
}

async fn fetch_config() -> String {
    sleep(Duration::from_millis(50)).await;
    "prod".into()
}

#[tokio::main]
async fn main() {
    // join!: ejecuta los tres en paralelo, espera a todos
    // Tiempo total ? 100ms (el más lento), no 230ms (suma secuencial)
    let (usuarios, pedidos, config) = tokio::join!(
        fetch_usuarios(),
        fetch_pedidos(),
        fetch_config(),
    );
    println!("usuarios: {:?}, pedidos: {:?}, env: {}", usuarios, pedidos, config);

    // try_join!: para al primer error
    let resultado: Result<_, String> = tokio::try_join!(
        async { Ok::<_, String>(fetch_usuarios().await) },
        async { Err("error de red".to_string()) },
        async { Ok(fetch_config().await) },
    );
    println!("try_join falló: {}", resultado.is_err());
}

spawn_blocking: trabajo CPU-intensivo o bloqueante

Las funciones que bloquean el hilo (E/S síncrona, cálculo intensivo, FFI bloqueante) no deben llamarse directamente desde un future async: bloquearían el hilo del runtime e impedirían que otras tareas avancen. spawn_blocking delega el trabajo a un pool de hilos dedicado.

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // E/S síncrona bloqueante desde código async
    let contenido = tokio::task::spawn_blocking(|| {
        std::fs::read_to_string("/etc/hostname") // bloqueante
    })
    .await??;
    println!("hostname: {}", contenido.trim());

    // Cálculo intensivo: no bloquea el runtime
    let resultado = tokio::task::spawn_blocking(|| {
        (1u64..=50_000_000).filter(|n| n % 7 == 0).count()
    })
    .await?;
    println!("Múltiplos de 7 hasta 50M: {resultado}");

    // Error típico: MutexGuard cruzando un .await
    // let guard = mutex.lock().await;
    // alguna_operacion_async().await; // DEADLOCK potencial
    // drop(guard);                    // correcto: soltar antes del await
    Ok(())
}

tokio::sync y CancellationToken

tokio::sync ofrece versiones async de las primitivas de sincronización: Mutex, RwLock, Notify, Semaphore, Barrier y canales (mpsc, broadcast, oneshot). CancellationToken (del crate tokio-util) propaga cancelaciones a grupos de tareas.

use tokio_util::sync::CancellationToken;
use tokio::time::{sleep, Duration};

async fn worker(id: u32, token: CancellationToken) {
    loop {
        tokio::select! {
            _ = token.cancelled() => {
                println!("worker {id} cancelado");
                return;
            }
            _ = sleep(Duration::from_millis(50)) => {
                println!("worker {id}: tick");
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let token = CancellationToken::new();

    let handles: Vec<_> = (0..3).map(|id| {
        let t = token.clone();
        tokio::spawn(async move { worker(id, t).await })
    }).collect();

    sleep(Duration::from_millis(130)).await;
    token.cancel(); // cancela todos los workers a la vez

    for h in handles {
        h.await.unwrap();
    }
    println!("Todos los workers terminaron");
}

El error más frecuente con Tokio avanzado es sostener un tokio::sync::MutexGuard mientras se espera un .await: el runtime puede mover la tarea a otro hilo entre el lock y el await, causando un deadlock o un error de compilación. La solución es siempre soltar el guard antes del .await, ya sea con un bloque delimitado o con drop(guard) explícito.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP