Una vez dominados los fundamentos de async/await con Tokio, el paso siguiente es coordinar tareas más complejas: comunicación entre ellas con canales async, estado compartido mutable con Mutex async, grupos de tareas con JoinSet, y cancelación limpia con abort(). Estos patrones aparecen en cualquier servidor, scraper o sistema de colas.
tokio::sync::mpsc: canal async entre tareas
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::(32);
// Productor
let tx_clon = tx.clone();
tokio::spawn(async move {
for i in 0..5 {
tx_clon.send(format!("mensaje {i}")).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
});
drop(tx); // cerrar el tx original
// Consumidor
while let Some(msg) = rx.recv().await {
println!("Recibido: {msg}");
}
println!("Canal cerrado");
}
tokio::sync::oneshot: respuesta única
use tokio::sync::oneshot;
async fn calcular() -> i32 {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
99
}
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<i32>();
tokio::spawn(async move {
let resultado = calcular().await;
tx.send(resultado).unwrap();
});
match rx.await {
Ok(v) => println!("Resultado: {v}"),
Err(_) => println!("Tarea cancelada"),
}
}
tokio::sync::Mutex: estado compartido async
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let estado = Arc::new(Mutex::new(vec![]));
let mut handles = vec![];
for i in 0..5 {
let e = Arc::clone(&estado);
handles.push(tokio::spawn(async move {
let mut guard = e.lock().await; // async, no bloquea el hilo
guard.push(format!("tarea {i}"));
}));
}
for h in handles { h.await.unwrap(); }
println!("{:?}", estado.lock().await);
}
JoinSet: grupo de tareas con control
use tokio::task::JoinSet;
use tokio::time::{sleep, Duration, timeout};
#[tokio::main]
async fn main() {
let mut set = JoinSet::new();
// Scraper paralelo con timeout global
let urls = vec!["url1", "url2", "url3", "url4", "url5"];
for url in urls {
set.spawn(async move {
sleep(Duration::from_millis(rand_delay())).await;
format!("contenido de {url}")
});
}
let resultado = timeout(Duration::from_secs(2), async {
let mut resultados = vec![];
while let Some(res) = set.join_next().await {
resultados.push(res.unwrap());
}
resultados
}).await;
match resultado {
Ok(r) => println!("Completado: {} páginas", r.len()),
Err(_) => println!("Timeout: cancelando tareas restantes"),
}
set.abort_all(); // cancelar las que no terminaron
}
fn rand_delay() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().subsec_millis() as u64 % 500
}
Servidor TCP concurrente
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Escuchando en 8080");
loop {
let (socket, addr) = listener.accept().await?;
println!("Conexión de {addr}");
// Cada conexión en su propia tarea
tokio::spawn(async move {
if let Err(e) = manejar_cliente(socket).await {
eprintln!("Error con {addr}: {e}");
}
});
}
}
async fn manejar_cliente(mut socket: TcpStream) -> anyhow::Result<()> {
let mut buf = [0u8; 1024];
let n = socket.read(&mut buf).await?;
socket.write_all(&buf[..n]).await?; // echo
Ok(())
}
abort(): cancelación limpia
#[tokio::main]
async fn main() {
let tarea = tokio::spawn(async {
loop {
println!("trabajando...");
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
});
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tarea.abort();
match tarea.await {
Ok(_) => println!("completado"),
Err(e) if e.is_cancelled() => println!("cancelado"),
Err(e) => println!("error: {e}"),
}
}
Resumen
tokio::sync::mpscpara múltiples productores a un consumidor; cierra los tx para señalizar fin.oneshotpara enviar exactamente un valor de vuelta a quien lanzó la tarea.tokio::sync::Mutexes async (no bloquea el hilo);std::sync::Mutexbloquea el hilo executor.JoinSetagrupa tareas y permite recoger resultados según llegan o cancelar las pendientes.timeout()envuelve cualquier future con un límite de tiempo.abort()cancela una tarea; elJoinErrorindica si fue cancelación o pánico.
