Concurrency
Fuyu uses structured concurrency to manage parallel processes. Structured concurrency gives guarantees on when tasks are finished.
Async
The async()
function uses implicit arguments to manipulate a context for
scoping concurrent tasks. An async()
accepts a function as an argument and
calls it with a provision to an Async
context. This context is used to manage
asynchronous tasks. Functions such as std/task::start()
require an implicit
Async
argument to create the task and add it to the task group.
import std/io
import std/task
async() {
task::start() {
io::println("Hello")
}
task::start() {
io::println("World")
}
}
The above will print either Hello
followed by World
or World
followed by
Hello
. There is no guarantee on the order. The async()
function will block
execution until both tasks are completed.
Since async()
provides an Async
to spawn new tasks, anything that is
reachable from the async()
block can also spawn new tasks. The Async
context is usually passed around implicitly by functions have Async
in
their using list.
import std/io
import std/iter
import std/task
// The implicit argument gets the Async that is needed
// to create the tasks.
def start_tasks(n: Int)(using Async) {
0..n | iter::each { |i|
task::start() {
io::println(i)
}
}
}
async() {
// Start 1 thousand tasks that print the numbers 0 to 999.
start_tasks(1_000)
}
Panics
If one task in an async()
panics, then all other tasks in the same async()
are cancelled, and the panic is propagated.
import std/duration
import std/io
import std/task
async() {
task::start() {
panic("Oops!")
}
task::start() {
task::sleep(duration::from_seconds(10))
io::println("This works.")
}
}
In this example, the following happens:
- The first task is started. It panics and the task stops.
- While the first task is starting, running, panicking, or terminated, the second task starts.
- The second task takes a long time because of the
sleep()
, and it is likely that theasync()
reaches the end before the second task finishes. - At the end of the
async()
, all tasks are waited on for completion. The first task was seen to panic, so all other waited upon tasks are cancelled. It is likely that the second task is cancelled during its sleep. - The panic from the first task is propagated from the
async()
.
In this example, it is likely that This works.
is never printed, since the
second task will be cancelled before the print.
Channels
Channels are used to send messages between tasks. A channel is a concurrent
queue that supports send and receive operations via its Sender
and Receiver
ends.
import std/channel
import std/iter
import std/task
let sum = async() {
let (sender, receiver) = channel::new()
// This task produces values to sum.
task::start() {
0..10 | iter::each(channel::send(sender, $))
}
// The current task receives the values and sums them.
receiver | channel::recv_iter() | iter::sum()
}
assert_eq(sum, 55)