Tokio avanzado: channels async, Mutex async, streams y manejo de errores en tareas

Una vez dominados los fundamentos de async/await con Tokio, el paso siguiente es coordinar tareas más complejas: comunicación entre ellas con canales async, estado compartido mutable con Mutex async, grupos de tareas con JoinSet, y cancelación limpia con abort(). Estos patrones aparecen en cualquier servidor, scraper o sistema de colas.

tokio::sync::mpsc: canal async entre tareas

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::(32);

    // Productor
    let tx_clon = tx.clone();
    tokio::spawn(async move {
        for i in 0..5 {
            tx_clon.send(format!("mensaje {i}")).await.unwrap();
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
        }
    });

    drop(tx); // cerrar el tx original

    // Consumidor
    while let Some(msg) = rx.recv().await {
        println!("Recibido: {msg}");
    }
    println!("Canal cerrado");
}

tokio::sync::oneshot: respuesta única

use tokio::sync::oneshot;

async fn calcular() -> i32 {
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    99
}

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<i32>();

    tokio::spawn(async move {
        let resultado = calcular().await;
        tx.send(resultado).unwrap();
    });

    match rx.await {
        Ok(v)  => println!("Resultado: {v}"),
        Err(_) => println!("Tarea cancelada"),
    }
}

tokio::sync::Mutex: estado compartido async

use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let estado = Arc::new(Mutex::new(vec![]));
    let mut handles = vec![];

    for i in 0..5 {
        let e = Arc::clone(&estado);
        handles.push(tokio::spawn(async move {
            let mut guard = e.lock().await; // async, no bloquea el hilo
            guard.push(format!("tarea {i}"));
        }));
    }

    for h in handles { h.await.unwrap(); }
    println!("{:?}", estado.lock().await);
}

JoinSet: grupo de tareas con control

use tokio::task::JoinSet;
use tokio::time::{sleep, Duration, timeout};

#[tokio::main]
async fn main() {
    let mut set = JoinSet::new();

    // Scraper paralelo con timeout global
    let urls = vec!["url1", "url2", "url3", "url4", "url5"];

    for url in urls {
        set.spawn(async move {
            sleep(Duration::from_millis(rand_delay())).await;
            format!("contenido de {url}")
        });
    }

    let resultado = timeout(Duration::from_secs(2), async {
        let mut resultados = vec![];
        while let Some(res) = set.join_next().await {
            resultados.push(res.unwrap());
        }
        resultados
    }).await;

    match resultado {
        Ok(r)  => println!("Completado: {} páginas", r.len()),
        Err(_) => println!("Timeout: cancelando tareas restantes"),
    }

    set.abort_all(); // cancelar las que no terminaron
}

fn rand_delay() -> u64 {
    use std::time::{SystemTime, UNIX_EPOCH};
    SystemTime::now().duration_since(UNIX_EPOCH).unwrap().subsec_millis() as u64 % 500
}

Servidor TCP concurrente

use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Escuchando en 8080");

    loop {
        let (socket, addr) = listener.accept().await?;
        println!("Conexión de {addr}");

        // Cada conexión en su propia tarea
        tokio::spawn(async move {
            if let Err(e) = manejar_cliente(socket).await {
                eprintln!("Error con {addr}: {e}");
            }
        });
    }
}

async fn manejar_cliente(mut socket: TcpStream) -> anyhow::Result<()> {
    let mut buf = [0u8; 1024];
    let n = socket.read(&mut buf).await?;
    socket.write_all(&buf[..n]).await?; // echo
    Ok(())
}

abort(): cancelación limpia

#[tokio::main]
async fn main() {
    let tarea = tokio::spawn(async {
        loop {
            println!("trabajando...");
            tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
        }
    });

    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    tarea.abort();

    match tarea.await {
        Ok(_)                                   => println!("completado"),
        Err(e) if e.is_cancelled()              => println!("cancelado"),
        Err(e)                                  => println!("error: {e}"),
    }
}

Resumen

  • tokio::sync::mpsc para múltiples productores a un consumidor; cierra los tx para señalizar fin.
  • oneshot para enviar exactamente un valor de vuelta a quien lanzó la tarea.
  • tokio::sync::Mutex es async (no bloquea el hilo); std::sync::Mutex bloquea el hilo executor.
  • JoinSet agrupa tareas y permite recoger resultados según llegan o cancelar las pendientes.
  • timeout() envuelve cualquier future con un límite de tiempo.
  • abort() cancela una tarea; el JoinError indica si fue cancelación o pánico.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP