Skip to main content

Concurrency

Fuyu uses structured concurrency to manage parallel processes. Structured concurrency gives guarantees on when tasks are finished.

Async

The async() function uses implicit arguments to manipulate a context for scoping concurrent tasks. An async() accepts a function as an argument and calls it with a provision to an Async context. This context is used to manage asynchronous tasks. Functions such as std/task::start() require an implicit Async argument to create the task and add it to the task group.

import std/io
import std/task

async() {
task::start() {
io::println("Hello")
}
task::start() {
io::println("World")
}
}

The above will print either Hello followed by World or World followed by Hello. There is no guarantee on the order. The async() function will block execution until both tasks are completed.

Since async() provides an Async to spawn new tasks, anything that is reachable from the async() block can also spawn new tasks. The Async context is usually passed around implicitly by functions have Async in their using list.

import std/io
import std/iter
import std/task

// The implicit argument gets the Async that is needed
// to create the tasks.
def start_tasks(n: Int)(using Async) {
0..n | iter::each { |i|
task::start() {
io::println(i)
}
}
}

async() {
// Start 1 thousand tasks that print the numbers 0 to 999.
start_tasks(1_000)
}

Panics

If one task in an async() panics, then all other tasks in the same async() are cancelled, and the panic is propagated.

import std/duration
import std/io
import std/task

async() {
task::start() {
panic("Oops!")
}
task::start() {
task::sleep(duration::from_seconds(10))
io::println("This works.")
}
}

In this example, the following happens:

  1. The first task is started. It panics and the task stops.
  2. While the first task is starting, running, panicking, or terminated, the second task starts.
  3. The second task takes a long time because of the sleep(), and it is likely that the async() reaches the end before the second task finishes.
  4. At the end of the async(), all tasks are waited on for completion. The first task was seen to panic, so all other waited upon tasks are cancelled. It is likely that the second task is cancelled during its sleep.
  5. The panic from the first task is propagated from the async().

In this example, it is likely that This works. is never printed, since the second task will be cancelled before the print.

Channels

Channels are used to send messages between tasks. A channel is a concurrent queue that supports send and receive operations via its Sender and Receiver ends.

import std/channel
import std/iter
import std/task

let sum = async() {
let (sender, receiver) = channel::new()
// This task produces values to sum.
task::start() {
0..10 | iter::each(channel::send(sender, $))
}
// The current task receives the values and sums them.
receiver | channel::recv_iter() | iter::sum()
}
assert_eq(sum, 55)