Tokio es el runtime async más usado en Rust, pero su uso avanzado va mucho más allá de tokio::main y spawn. Coordinar múltiples futures con select! y join!, delegar trabajo CPU-intensivo con spawn_blocking, compartir estado con las primitivas de tokio::sync y propagar cancelaciones con CancellationToken son las herramientas que distinguen un uso básico de uno profesional.
select!: ejecutar el primer future que complete
tokio::select! ejecuta varios futures concurrentemente y devuelve el resultado del primero en completar, cancelando los demás. Es el mecanismo principal para implementar timeouts, carreras entre operaciones y escape de bucles de espera.
use tokio::time::{sleep, Duration, timeout};
use tokio::sync::oneshot;
async fn operacion_lenta() -> String {
sleep(Duration::from_millis(200)).await;
"resultado de la operación".into()
}
async fn fallback() -> String {
sleep(Duration::from_millis(100)).await;
"fallback (más rápido)".into()
}
#[tokio::main]
async fn main() {
// select!: gana el primero
let ganador = tokio::select! {
r = operacion_lenta() => format!("operación: {r}"),
r = fallback() => format!("fallback: {r}"),
};
println!("{ganador}"); // fallback: fallback (más rápido)
// timeout como sugar sobre select!
match timeout(Duration::from_millis(50), operacion_lenta()).await {
Ok(r) => println!("completado: {r}"),
Err(_) => println!("timeout después de 50ms"),
}
// Cancelación con canal oneshot
let (tx, mut rx) = oneshot::channel::<()>();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut rx => { println!("tarea cancelada"); break; }
_ = sleep(Duration::from_millis(20)) => {
println!("tick...");
}
}
}
});
sleep(Duration::from_millis(70)).await;
let _ = tx.send(());
handle.await.unwrap();
}
join! y try_join!: ejecutar futures en paralelo
tokio::join! ejecuta todos los futures concurrentemente y espera a que todos completen. try_join! hace lo mismo pero cortocircuita si alguno devuelve Err.
use tokio::time::{sleep, Duration};
async fn fetch_usuarios() -> Vec<String> {
sleep(Duration::from_millis(100)).await;
vec!["Alice".into(), "Bob".into()]
}
async fn fetch_pedidos() -> Vec<u32> {
sleep(Duration::from_millis(80)).await;
vec![1, 2, 3]
}
async fn fetch_config() -> String {
sleep(Duration::from_millis(50)).await;
"prod".into()
}
#[tokio::main]
async fn main() {
// join!: ejecuta los tres en paralelo, espera a todos
// Tiempo total ? 100ms (el más lento), no 230ms (suma secuencial)
let (usuarios, pedidos, config) = tokio::join!(
fetch_usuarios(),
fetch_pedidos(),
fetch_config(),
);
println!("usuarios: {:?}, pedidos: {:?}, env: {}", usuarios, pedidos, config);
// try_join!: para al primer error
let resultado: Result<_, String> = tokio::try_join!(
async { Ok::<_, String>(fetch_usuarios().await) },
async { Err("error de red".to_string()) },
async { Ok(fetch_config().await) },
);
println!("try_join falló: {}", resultado.is_err());
}
spawn_blocking: trabajo CPU-intensivo o bloqueante
Las funciones que bloquean el hilo (E/S síncrona, cálculo intensivo, FFI bloqueante) no deben llamarse directamente desde un future async: bloquearían el hilo del runtime e impedirían que otras tareas avancen. spawn_blocking delega el trabajo a un pool de hilos dedicado.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// E/S síncrona bloqueante desde código async
let contenido = tokio::task::spawn_blocking(|| {
std::fs::read_to_string("/etc/hostname") // bloqueante
})
.await??;
println!("hostname: {}", contenido.trim());
// Cálculo intensivo: no bloquea el runtime
let resultado = tokio::task::spawn_blocking(|| {
(1u64..=50_000_000).filter(|n| n % 7 == 0).count()
})
.await?;
println!("Múltiplos de 7 hasta 50M: {resultado}");
// Error típico: MutexGuard cruzando un .await
// let guard = mutex.lock().await;
// alguna_operacion_async().await; // DEADLOCK potencial
// drop(guard); // correcto: soltar antes del await
Ok(())
}
tokio::sync y CancellationToken
tokio::sync ofrece versiones async de las primitivas de sincronización: Mutex, RwLock, Notify, Semaphore, Barrier y canales (mpsc, broadcast, oneshot). CancellationToken (del crate tokio-util) propaga cancelaciones a grupos de tareas.
use tokio_util::sync::CancellationToken;
use tokio::time::{sleep, Duration};
async fn worker(id: u32, token: CancellationToken) {
loop {
tokio::select! {
_ = token.cancelled() => {
println!("worker {id} cancelado");
return;
}
_ = sleep(Duration::from_millis(50)) => {
println!("worker {id}: tick");
}
}
}
}
#[tokio::main]
async fn main() {
let token = CancellationToken::new();
let handles: Vec<_> = (0..3).map(|id| {
let t = token.clone();
tokio::spawn(async move { worker(id, t).await })
}).collect();
sleep(Duration::from_millis(130)).await;
token.cancel(); // cancela todos los workers a la vez
for h in handles {
h.await.unwrap();
}
println!("Todos los workers terminaron");
}
El error más frecuente con Tokio avanzado es sostener un tokio::sync::MutexGuard mientras se espera un .await: el runtime puede mover la tarea a otro hilo entre el lock y el await, causando un deadlock o un error de compilación. La solución es siempre soltar el guard antes del .await, ya sea con un bloque delimitado o con drop(guard) explícito.
