Guides#
pw_async2: Cooperative async tasks for embedded
Implementing tasks#
Task instances complete one or more asynchronous
operations. They are the top-level “thread” primitives of pw_async2
.
You can use one of the concrete subclasses of Task
that Pigweed provides:
CoroOrElseTask: Delegates to a provided coroutine and executes an
or_else
handler function on failure.PendFuncTask: Delegates to a provided function.
PendableAsTask: Delegates to a type with a
Pend
method.AllocateTask: Creates a concrete subclass of
Task
, just likePendableAsTask
, but the created task is dynamically allocated and frees the associated memory upon completion.
Or you can subclass Task
yourself. See Task
for more guidance on subclassing.
How a dispatcher manages tasks#
The purpose of a Dispatcher is to keep track of a set of Task objects and run them to completion. The dispatcher is essentially a scheduler for cooperatively scheduled (non-preemptive) threads (tasks).
While a dispatcher is running, it waits for one or more tasks to waken and then
advances each task by invoking its DoPend
method. The DoPend
method is typically implemented manually by users, though
it is automatically provided by coroutines.
If the task is able to complete, DoPend
will return Ready
, in which
case the dispatcher will deregister the task.
If the task is unable to complete, DoPend
must return Pending
and
arrange for the task to be woken up when it is able to make progress again.
Once the task is rewoken, the task is re-added to the Dispatcher
queue. The
dispatcher will then invoke DoPend
once more, continuing the cycle until
DoPend
returns Ready
and the task is completed.
The following sequence diagram summarizes the basic workflow:
Implementing invariants for pendable functions#
Any Pend
-like function or method similar to
DoPend that can pause when it’s not able
to make progress on its task is known as a pendable function. When
implementing a pendable function, make sure that you always uphold the
following invariants:
Note
Exactly which APIs are considered pendable?
If it has the signature (Context&, ...) -> Poll<T>
,
then it’s a pendable function.
Arranging future completion of incomplete tasks#
When your pendable function can’t yet complete:
Do one of the following to make sure the task rewakes when it’s ready to make more progress:
Delegate waking to a subtask. Arrange for that subtask’s pendable function to wake this task when appropriate.
Arrange an external wakeup. Use
PW_ASYNC_STORE_WAKER
to store the task’s waker somewhere, and then call Wake from an interrupt or another thread once the event that the task is waiting for has completed.Re-enqueue the task with ReEnqueue. This is a rare case. Usually, you should just create an immediately invoked
Waker
.
Make sure to return Pending to signal that the task is incomplete.
In other words, whenever your pendable function returns
Pending, you must guarantee that Wake()
is called once in the future.
For example, one implementation of a delayed task might arrange for a timer to
wake its Waker
once some time has passed. Another case might be a messaging
library which calls Wake()
on the receiving task once a sender has placed a
message in a queue.
Cleaning up complete tasks#
When your pendable function has completed, make sure to return pw::async2::Ready() to signal that the task is complete.
Passing data between tasks#
Astute readers will have noticed that the Wake
method takes zero arguments,
and DoPoll
does not provide the task it polls with any values!
Unlike callback-based interfaces, tasks (and the libraries they use) are responsible for storage of their inputs and outputs, and you are responsible for defining how that happens.
It is also important to ensure that the receiver puts itself to sleep correctly when the it is waiting for a value. The sender likewise must wait for available storage before it sends a value.
There are two patterns for doing this, depending on how much data you want to send.
Single values#
This pattern is for when you have a task that receives a single, one-time value. Once the value is received, that value is immutable. The task can either go on to waiting for something else, or can complete.
In this pattern, the task would typically hold storage for the value, and you
would implement some custom interface to set the value once one is available.
Setting a value would additionally set some internal flag indicating a value was
set, and also arrange to wake the task via its Waker
.
pw_async2
provides helpers for this pattern which add a useful layer of
abstraction and ensure you implement it correctly.
For the first pair of helpers, the receiver helper OnceReceiver owns the storage for a value and is linked on creation to a OnceSender which provides the interface for sending a value.
Construct a linked pair of these helpers by calling MakeOnceSenderAndReceiver<T>, using value type as the template argument.
You would then typically transfer ownership of the receiver to your Task using
std::move
. Note that as a side-effect, the move modifies the linked sender
to point at the new location of the receiver, maintaining the link.
1 auto [sender, receiver] = MakeOnceSenderAndReceiver<int>();
2 ReceiveAndLogValueTask task(std::move(receiver));
You can also similarly and safely move the sender to transfer its ownership.
When implementing the task, you should prefer to use PW_TRY_READY_ASSIGN to automate handling the Pending return value of OnceReceiver::Pend(), leaving you to decide how to handle the error case if no value being sent (which can happen if the sender is destroyed), and more typically what to do with the received value.
1 PW_TRY_READY_ASSIGN(Result<int> value, int_receiver_.Pend(cx));
2 if (!value.ok()) {
3 PW_LOG_ERROR(
4 "OnceSender was destroyed without sending a message! Outrageous :(");
5 }
6 PW_LOG_INFO("Received the integer value: %d", *value);
You can send a value to the task via the
emplace() member function. Note
that this allows you to std::move
the value, or even construct it in-place
if that makes sense to do.
1 // Send a value to the task
2 sender.emplace(5);
If your type is expensive to copy, pw_async2
also provides another pair of
helpers, OnceRefSender and
OnceRefReceiver, where an external
component owns the storage for the type T
.
For OnceRefReceiver, the receiving
task will still use Pend() to
get a Poll<Status>
value indicating if the sender has set the value, but it
will have to access the value itself through its own pointer or reference.
When the sender uses OnceRefSender to
set the value, do note that it does require making a copy into the external
value storage, though this can be a shallow copy through a std::move
if
supported.
You can find a complete example showing how to use these helpers in //pw_async2/examples/once_send_recv_test.cc, and you can try it for yourself with:
bazelisk run --config=cxx20 //pw_async2/examples:once_send_recv_test
Other primitives#
More primitives (such as MultiSender
and MultiReceiver
) are
in-progress. We encourage users who need further async primitives to contribute
them upstream to pw::async2
!
Multiple values#
If your tasks need to send or receive multiple values, then you can use the
awaitable interface of pw::InlineAsyncQueue or
pw::InlineAsyncDeque from pw_containers
.
If your needs are simple, this is a perfectly good way of handling a simple byte stream or message queue.
Both queue container types expose two key functions to allow interoperability
with pw_async2
:
|
Allows waiting until the queue has space for more values. |
|
Allows waiting until the queue has values. |
For sending, the producing task has to wait for there to be space before trying to add to the queue.
1 // Wait for there to be space in the queue
2 PW_TRY_READY(queue_.PendHasSpace(cx));
3 queue_.push(value);
Receiving values is similar. The receiving task has to wait for there to be values before trying to remove them from the queue.
1 // Wait for there to be values in the queue.
2 PW_TRY_READY(queue_.PendNotEmpty(cx));
3 const int value = queue_.front();
4 queue_.pop();
You can find a complete example for using InlineAsyncQueue this way in //pw_async2/examples/inline_async_queue_with_tasks_test.cc, and you can try it for yourself with:
bazelisk run //pw_async2/examples:inline_async_queue_with_tasks_test
Timing#
When using pw_async2
, you should inject timing functionality by accepting
a TimeProvider (most commonly
TimeProvider<SystemClock>
when using the system’s built-in time_point
and duration
types).
TimeProvider allows for easily waiting for a timeout or deadline using the WaitFor and WaitUntil methods. Additionally, you can test code that uses TimeProvider for timing with simulated time using SimulatedTimeProvider. Doing so helps avoid timing-dependent test flakes and helps ensure that tests are fast since they don’t need to wait for real-world time to elapse.
Interacting with async2 from non-async2 code using callbacks#
In a system gradually or partially adopting pw_async2
, there are often
cases where non-async2 code needs to run asynchronous operations built with
pw_async2
.
To facilitate this, pw_async2
provides callback tasks:
OneshotCallbackTask and
RecurringCallbackTask.
These tasks invoke a pendable function, forwarding its result to a provided callback on completion.
The two variants of callback tasks are:
OneshotCallbackTask<T>: Pends the pendable. When the pendable returns
Ready(value)
, the task invokes the callback once withvalue
. After the callback finishes, theOneshotCallbackTask
itself completes. This is useful for single, asynchronous requests.RecurringCallbackTask<T>: Similar to the oneshot version, but after the task invokes the callback, the
RecurringCallbackTask
continues polling the pendable function. This is suitable for operations that produce a stream of values over time, where you want to process each one.
Example#
#include "pw_async2/callback_task.h"
#include "pw_log/log.h"
#include "pw_result/result.h"
// Assume the async2 part of the system exposes a function to post tasks to
// its dispatcher.
void PostTaskToDispatcher(pw::async2::Task& task);
// The async2 function we'd like to call.
pw::async2::Poll<pw::Result<int>> ReadValue(pw::async2::Context&);
// Non-async2 code.
int ReadAndPrintAsyncValue() {
pw::async2::OneshotCallbackTaskFor<&ReadValue> task([](pw::Result<int> result) {
if (result.ok()) {
PW_LOG_INFO("Read value: %d", result.value());
} else {
PW_LOG_ERROR("Failed to read value: %s", result.status().str());
}
});
PostTaskToDispatcher(task);
// In this example, the code allocates the task on the stack, so we would
// need to wait for it to complete before it goes out of scope. In a real
// application, the task may be a member of a long-lived object, or you
// might choose to statically allocate it.
}
Interacting with hardware#
A common use case for pw_async2
is interacting with hardware that uses
interrupts. The following example demonstrates this by creating a fake UART
device with an asynchronous reading interface and a separate thread that
simulates hardware interrupts.
The example can be built and run in upstream Pigweed with the following command:
bazelisk run //pw_async2/examples:interrupt
FakeUart
simulates an interrupt-driven UART with an asynchronous interface
for reading bytes (ReadByte
). The HandleReceiveInterrupt
method would
be called from an ISR. (In the example, this is simulated via keyboard input.)
1// A fake UART device that provides an asynchronous byte reading interface.
2class FakeUart {
3 public:
4 // Asynchronously reads a single byte from the UART.
5 //
6 // If a byte is available in the receive queue, it returns `Ready(byte)`.
7 // If another task is already waiting for a byte, it returns
8 // `Ready(Status::Unavailable())`.
9 // Otherwise, it returns `Pending` and arranges for the task to be woken up
10 // when a byte arrives.
11 pw::async2::PollResult<char> ReadByte(pw::async2::Context& cx) {
12 // Blocking inside an async function is generally an anti-pattern because it
13 // prevents the single-threaded dispatcher from making progress on other
14 // tasks. However, using `pw::sync::InterruptSpinLock` here is acceptable
15 // due to the short-running nature of the ISR.
16 std::lock_guard lock(lock_);
17
18 // Check if the UART has been put into a failure state.
19 PW_TRY(status_);
20
21 // If a byte is already in the queue, return it immediately.
22 if (!rx_queue_.empty()) {
23 char byte = rx_queue_.front();
24 rx_queue_.pop();
25 return byte;
26 }
27
28 // If the queue is empty, the operation can't complete yet. Arrange for the
29 // task to be woken up later.
30 // `PW_ASYNC_TRY_STORE_WAKER` stores a waker from the current task's
31 // context. If another task's waker is already stored, it returns false.
32 if (PW_ASYNC_TRY_STORE_WAKER(cx, waker_, "Waiting for a byte from UART")) {
33 return pw::async2::Pending();
34 }
35
36 // Another task is already waiting for a byte.
37 return pw::Status::Unavailable();
38 }
39
40 // Simulates a hardware interrupt that receives a character.
41 // This method is safe to call from an interrupt handler.
42 void HandleReceiveInterrupt() {
43 std::lock_guard lock(lock_);
44 if (rx_queue_.full()) {
45 // Buffer is full, drop the character.
46 PW_LOG_WARN("UART RX buffer full, dropping character.");
47 return;
48 }
49
50 // Generate a random lowercase letter to simulate receiving data.
51 char c = 'a' + (std::rand() % 26);
52 rx_queue_.push(c);
53
54 // Wake any task that is waiting for the data. Waking an empty waker is a
55 // no-op, so this can be called unconditionally.
56 std::move(waker_).Wake();
57 }
58
59 // Puts the UART into a terminated state.
60 void set_status(pw::Status status) {
61 std::lock_guard lock(lock_);
62 status_ = status;
63 // Wake up any pending task so it can observe the status change and exit.
64 std::move(waker_).Wake();
65 }
66
67 private:
68 pw::sync::InterruptSpinLock lock_;
69 pw::InlineQueue<char, 16> rx_queue_ PW_GUARDED_BY(lock_);
70 pw::async2::Waker waker_;
71 pw::Status status_;
72};
A reader task polls the fake UART until it receives data.
1 pw::async2::Dispatcher dispatcher;
2
3 // Create a task that reads from the UART in a loop.
4 pw::async2::PendFuncTask reader_task(
5 [](pw::async2::Context& cx) -> pw::async2::Poll<> {
6 while (true) {
7 // Poll `ReadByte` until it returns a `Ready`, then assign it to
8 // `result`.
9 PW_TRY_READY_ASSIGN(pw::Result<char> result, uart.ReadByte(cx));
10
11 if (!result.ok()) {
12 PW_LOG_ERROR("UART read failed: %s", result.status().str());
13 break;
14 }
15
16 PW_LOG_INFO("Received: %c", *result);
17 }
18
19 return pw::async2::Ready();
20 });
21
22 // Post the task to the dispatcher to schedule it for execution.
23 dispatcher.Post(reader_task);
This example shows how to bridge the gap between low-level, interrupt-driven
hardware and the high-level, cooperative multitasking model of pw_async2
.
Unit testing#
Unit testing pw_async2
code is different from testing non-async code. You
must run async code from a Task on a
Dispatcher.
To test pw_async2
code:
Declare a dispatcher.
Create a task to run the async code under test. Either implement Task or use PendFuncTask to wrap a lambda.
Post the task to the dispatcher.
Call RunUntilStalled to execute the task.
The following example shows the basic structure of a pw_async2
unit test.
#include "pw_async2/context.h"
#include "pw_async2/dispatcher.h"
#include "pw_async2/pend_func_task.h"
#include "pw_unit_test/framework.h"
using ::pw::async2::Context;
using ::pw::async2::Ready;
namespace examples {
TEST(Async2UnitTest, MinimalExample) {
pw::async2::Dispatcher dispatcher;
// Create a test task to run the pw_async2 code under test.
pw::async2::PendFuncTask task([](Context&) { return Ready(); });
// Post and run the task on the dispatcher.
dispatcher.Post(task);
EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
}
} // namespace examples
It is usually necessary to run the test task multiple times to advance async code through its states. This improves coverage and ensures that wakers are stored and woken properly.
To run the test task multiple times:
Post the task to the dispatcher.
Call RunUntilStalled(), which returns Pending.
Perform actions to allow the task to advance.
Call RunUntilStalled() again.
Repeat until the task runs to completion and RunUntilStalled() returns Ready.
The example below runs a task multiple times to test waiting for a
FortuneTeller
class to produce a fortune.
#include <utility>
#include "pw_async2/context.h"
#include "pw_async2/dispatcher.h"
#include "pw_async2/pend_func_task.h"
#include "pw_async2/try.h"
#include "pw_unit_test/framework.h"
using ::pw::async2::Context;
using ::pw::async2::Pending;
using ::pw::async2::Poll;
using ::pw::async2::Ready;
namespace examples {
// The class being tested.
class FortuneTeller {
public:
// Gets a fortune from the fortune teller.
Poll<const char*> PendFortune(Context& context) {
if (next_fortune_ == nullptr) {
PW_ASYNC_STORE_WAKER(context, waker_, "divining the future");
return Pending();
}
return std::exchange(next_fortune_, nullptr);
}
// Sets the next fortune to use and wakes a task waiting for one, if any.
void SetFortune(const char* fortune) {
next_fortune_ = fortune;
// Wake any task waiting for a fortune. If no tasks are waiting, this is a
// no-op.
std::move(waker_).Wake();
}
private:
pw::async2::Waker waker_;
const char* next_fortune_ = nullptr;
};
TEST(Async2UnitTest, MultiStepExample) {
pw::async2::Dispatcher dispatcher;
FortuneTeller oracle;
const char* fortune = "";
// This task gets a fortune and checks that it matches the expected value.
// The task may need to execute multiple times if the fortune is not ready.
pw::async2::PendFuncTask task([&](Context& context) -> Poll<> {
PW_TRY_READY_ASSIGN(fortune, oracle.PendFortune(context));
return Ready();
});
dispatcher.Post(task);
// The fortune hasn't been set, so the task should be pending.
ASSERT_EQ(dispatcher.RunUntilStalled(), Pending());
// Set the fortune, which wakes the pending task.
oracle.SetFortune("you will bring balance to the force");
// The task runs, gets the fortune, then returns Ready.
ASSERT_EQ(dispatcher.RunUntilStalled(), Ready());
// Ensure the fortune was set as expected.
EXPECT_STREQ(fortune, "you will bring balance to the force");
}
} // namespace examples
Debugging#
You can inspect tasks registered to a dispatcher by calling :Dispatcher::LogRegisteredTasks(), which logs information for each task in the dispatcher’s pending and sleeping queues.
Sleeping tasks will log information about their assigned wakers, with the wait reason provided for each.
If space is a concern, you can set the module configuration option
PW_ASYNC2_DEBUG_WAIT_REASON to 0
to disable wait reason storage
and logging. Under this configuration, the dispatcher only logs the waker count
of a sleeping task.