Rust async using channels
Asynchronous programming in Rust has the reputation to be difficult to the async
-keyword which implements await mechanism in Rust.These have their use for async problems where fine control over the flow is necessary. For many problems one can however just use channelswhich are well known for Go programmers.
This compares channels in Rust and Go and concludes with a minimal full Rust program. One common theme will be that Rust uses its standard syntax while gouses a lot of specialized one- or few-purpose syntax for channels.
Single consumer
The standard library in Rust only provides mpsc
(multi producer single consumer) channel.These cover the typical fan-out use-case from Golang. If multi consumer are needed, there are e.g. the crossbeamand async-channel-crate providing these. There is also experimental stdlib support for mpmc channels.
Comparison with go
We will just send some Integers. Sending structs is straight-forward in both languages.
Creating a channel
// go c := make(chan int)
// rust let (p,s) = mpsc::channel();
In Go, we used some builtin keyword to construct a channel which is a specialized type in the go typesystem.In Rust, we called a function that returned a tuple which was destructured into the variables p
and s
.In Rust, it is typically not necessary to specify the type of the channel as this follows from type interference (usage of the producer later on).
In terms of how they function they differ a bit. The go channel is synchronous, so it will block on sending till the value has been read.The Rust channel is asynchronous. It will accept infinite objects into its buffer without blocking (limited by out of memory errors).Both languages also have buffered versions with a fixed size buffer. These behave identical.
Sending values
Let's send the numbers from 1 to 10 through the channel and close/disconnect it afterwards to signal that we are done.We will use in the Go code an error group to have like in Rust a mechanism to collect errors of the producer.
// go go func() { g := new(errgroup.Group) for i := range 10 { g.Go(func() error { c <- i + 1 return nil }) } if err := g.Wait(); err != nil { panic(err) } close(c) }()
// rust let joins = (1..=10) .map(|i| { let p = p.clone(); thread::spawn(move || { p.send(i).unwrap(); }) }) .collect::<Vec<_>>(); // Rust iterators are lazy, collection triggers threads to spawn drop(p); // omitted: receive result here (see below) joins.into_iter().try_for_each(JoinHandle::join)?;
The Rust variant allows the code in each async thread to error by throwing panics. These will be collected in the joins
(JoinHandler)which we will use when collecting the results. For golang error handling is provided here by the error group.
The Rust variant ends the sending with drop(p)
. Rust detects that a producer is disconnected by it going out of scope. In the internal loop we clone p
to producethe value, so we never send a value over the original producer. Hence, we need to explicitly drop it to make sure the consumer gets to know all producer are disconnected.While we have to make sure on the Rust side to let all producers go out of scope, we have to make sure in Go to never close a channel twice (would result in a panic).
What is a bit hidden is that the Rust variant is more expensive. It creates OS threads while Go uses coroutines which are run on a shared set of worker threads.If the cost of creating the threads becomes relevant it is worthwhile to use an async runtime in Rust which follows a similar strategy, e.g. tokio tasks.
Collecting the results
// go for result := range c { fmt.Println(result); }
// rust while let Ok(result) = r.recv() { println!("{result}"); }
Here both languages are straight-forward. Rust uses a while-let
which matches on the Ok
-enum of Result<i32,_>
from the receive-operation till it errors (when the producer have disconnected).
Complete programs
// go package main import ( "fmt" "golang.org/x/sync/errgroup" ) func main() { c := make(chan int) go func() { g := new(errgroup.Group) for i := range 10 { g.Go(func() error { c <- i + 1 return nil }) } if err := g.Wait(); err != nil { panic(err) } close(c) }() for result := range c { fmt.Println(result) } fmt.Println("Done") }
// rust use std::any::Any; use std::sync::mpsc; use std::thread; use std::thread::JoinHandle; fn main() -> Result<(), Box<dyn Any + Send>> { let (p, r) = mpsc::channel(); let joins = (1..=10) .map(|i| { let p = p.clone(); thread::spawn(move || { p.send(i).unwrap(); }) }) .collect::<Vec<_>>(); drop(p); while let Ok(result) = r.recv() { println!("{result}"); } joins.into_iter().try_for_each(JoinHandle::join)?; println!("Done"); Ok(()) }
Bonus: Rust variant using tokio
As mentioned above using OS threads is less performant. Here is a Rust variant using tokio tasks which are comparably cheap as Go coroutines:
//rust use std::io; use std::sync::mpsc; use tokio::task::JoinSet; #[tokio::main] async fn main() -> io::Result<()> { let (p, r) = mpsc::channel(); let mut set = JoinSet::new(); (1..=10).for_each(|i| { let p = p.clone(); set.spawn(async move { p.send(i) }); }); drop(p); while let Ok(result) = r.recv() { println!("{result}"); } set.join_all() .await .into_iter() .collect::<Result<Vec<_>, _>>() .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; println!("Done"); Ok(()) }