Canales en Rust: comunicación entre hilos con mpsc

La filosofía Go dice "do not communicate by sharing memory; instead, share memory by communicating". Rust adopta el mismo principio con los canales mpsc (multiple producer, single consumer): un hilo envía mensajes, otro los recibe, sin estado compartido ni mutex. El compilador garantiza que el dato enviado no puede usarse simultáneamente en dos hilos.

Canal básico: mpsc::channel

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        tx.send(String::from("hola desde el hilo")).unwrap();
    });

    let mensaje = rx.recv().unwrap(); // bloquea hasta recibir
    println!("{}", mensaje);
}

channel() devuelve un par (Sender, Receiver). send() transfiere la propiedad del dato: después del envío, el hilo emisor no puede usarlo. recv() bloquea hasta que llega un mensaje o el canal se cierra.

Múltiples mensajes

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let mensajes = vec!["hola", "desde", "el", "hilo"];
        for msg in mensajes {
            tx.send(String::from(msg)).unwrap();
            thread::sleep(Duration::from_millis(200));
        }
    });

    // rx como iterador: termina cuando el canal se cierra
    for recibido in rx {
        println!("Recibido: {}", recibido);
    }
}

Múltiples productores

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    for i in 0..3 {
        let tx_clon = tx.clone(); // clona el Sender
        thread::spawn(move || {
            tx_clon.send(format!("Mensaje del hilo {}", i)).unwrap();
        });
    }

    drop(tx); // cierra el Sender original para que rx sepa cuándo termina

    for msg in rx {
        println!("{}", msg);
    }
}

Canal con buffer: sync_channel

use std::sync::mpsc;
use std::thread;

fn main() {
    // Buffer de 3 mensajes: el emisor no bloquea hasta que el buffer se llena
    let (tx, rx) = mpsc::sync_channel(3);

    thread::spawn(move || {
        for i in 0..5 {
            tx.send(i).unwrap(); // bloquea cuando el buffer está lleno
            println!("Enviado {}", i);
        }
    });

    for msg in rx {
        println!("Recibido {}", msg);
    }
}

try_recv: no bloqueante

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
    tx.send(42).unwrap();

    match rx.try_recv() {
        Ok(msg)                          => println!("Recibido: {}", msg),
        Err(mpsc::TryRecvError::Empty)   => println!("Canal vacío"),
        Err(mpsc::TryRecvError::Disconnected) => println!("Canal cerrado"),
    }
}

Patrón productor-consumidor

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel::<i32>();

    // Varios productores
    let productores: Vec<_> = (0..4).map(|i| {
        let tx2 = tx.clone();
        thread::spawn(move || {
            for j in 0..5 {
                tx2.send(i * 100 + j).unwrap();
            }
        })
    }).collect();

    drop(tx);

    // Un consumidor
    let consumidor = thread::spawn(move || {
        let mut total = 0;
        for msg in rx {
            total += msg;
        }
        total
    });

    for h in productores { h.join().unwrap(); }
    let suma = consumidor.join().unwrap();
    println!("Suma total: {}", suma);
}

Resumen

  • mpsc::channel(): canal asíncrono ilimitado.
  • mpsc::sync_channel(n): canal síncrono con buffer de n mensajes.
  • tx.send(valor): transfiere la propiedad. El emisor ya no puede usar el valor.
  • rx.recv(): bloquea. rx.try_recv(): no bloquea.
  • Múltiples productores: clona el Sender. El receptor es único.
  • for msg in rx: itera hasta que el canal se cierra.

El siguiente artículo cubre Mutex y RwLock: cómo proteger datos compartidos entre hilos cuando los canales no son suficientes.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP