Guides#
pw_async2: Cooperative async tasks for embedded
This guide covers cross-cutting usage topics such as Unit testing.
Core component guides#
All of the pw_async2 core components have their own dedicated guides.
Tasks#
See Tasks.
Channels#
See Channels.
Dispatchers#
See Dispatcher.
Futures#
See Futures.
Integrating with existing, callback-based code#
See Callbacks.
Interacting with hardware#
A common use case for pw_async2 is interacting with hardware that uses
interrupts. The following example demonstrates this by creating a fake UART
device with an asynchronous reading interface and a separate thread that
simulates hardware interrupts.
The example can be built and run in upstream Pigweed with the following command:
bazelisk run //pw_async2/examples:interrupt
FakeUart simulates an interrupt-driven UART with an asynchronous interface
for reading bytes. The ReadByte method returns a ValueFuture that
resolves when a byte is available. The HandleReceiveInterrupt method would
be called from an ISR to resolve pending futures or queue data. (In the example,
this is simulated via keyboard input.)
1// A fake UART device that provides an asynchronous byte reading interface.
2class FakeUart {
3 public:
4 // Asynchronously reads a single byte from the UART.
5 //
6 // If a byte is available in the receive queue, it returns a Future that is
7 // already `Ready(byte)`.
8 // If another task is already waiting for a byte, it returns a Future that is
9 // `Ready(Status::Unavailable())`.
10 // Otherwise, it returns a `Pending` Future and arranges for the task to be
11 // woken up when a byte arrives.
12 pw::async2::ValueFuture<pw::Result<char>> ReadByte() {
13 // Blocking inside an async function is generally an anti-pattern because it
14 // prevents the single-threaded dispatcher from making progress on other
15 // tasks. However, using `pw::sync::InterruptSpinLock` here is acceptable
16 // due to the short-running nature of the ISR.
17 std::lock_guard lock(lock_);
18
19 using ResultFuture = pw::async2::ValueFuture<pw::Result<char>>;
20
21 // Check if the UART has been put into a failure state.
22 if (!status_.ok()) {
23 return ResultFuture::Resolved(status_);
24 }
25
26 // If a byte is already in the queue, return it immediately.
27 if (!rx_queue_.empty()) {
28 char byte = rx_queue_.front();
29 rx_queue_.pop();
30 return ResultFuture::Resolved(byte);
31 }
32
33 // If the queue is empty, the operation can't complete yet. Arrange for the
34 // task to be woken up later.
35 // `TryGet` returns a future if one is available, or `std::nullopt` if
36 // another task is already waiting.
37 std::optional<ResultFuture> future = provider_.TryGet();
38 if (!future.has_value()) {
39 return ResultFuture::Resolved(pw::Status::Unavailable());
40 }
41 return std::move(*future);
42 }
43
44 // Simulates a hardware interrupt that receives a character.
45 // This method is safe to call from an interrupt handler.
46 void HandleReceiveInterrupt() {
47 std::lock_guard lock(lock_);
48 if (rx_queue_.full()) {
49 // Buffer is full, drop the character.
50 PW_LOG_WARN("UART RX buffer full, dropping character.");
51 return;
52 }
53
54 // Generate a random lowercase letter to simulate receiving data.
55 char c = 'a' + (std::rand() % 26);
56
57 // If a task is waiting for a byte, give it the byte immediately.
58 if (provider_.has_future()) {
59 provider_.Resolve(c);
60 } else {
61 // Otherwise, store the byte in the queue.
62 rx_queue_.push(c);
63 }
64 }
65
66 // Puts the UART into a terminated state.
67 void set_status(pw::Status status) {
68 std::lock_guard lock(lock_);
69 status_ = status;
70 // Wake up any pending task so it can observe the status change and exit.
71 provider_.Resolve(status);
72 }
73
74 private:
75 pw::sync::InterruptSpinLock lock_;
76 pw::InlineQueue<char, 16> rx_queue_ PW_GUARDED_BY(lock_);
77 pw::async2::ValueProvider<pw::Result<char>> provider_;
78 pw::Status status_;
79};
A reader task obtains a future from the UART and polls it until it receives data.
1 pw::async2::BasicDispatcher dispatcher;
2
3 // Create a task that reads from the UART in a loop.
4 class ReaderTask : public pw::async2::Task {
5 public:
6 ReaderTask(FakeUart& uart) : uart_(uart) {}
7
8 private:
9 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
10 while (true) {
11 if (!future_.has_value()) {
12 future_ = uart_.ReadByte();
13 }
14
15 PW_TRY_READY_ASSIGN(pw::Result<char> result, future_->Pend(cx));
16
17 future_.reset();
18
19 if (!result.ok()) {
20 PW_LOG_ERROR("UART read failed: %s", result.status().str());
21 break;
22 }
23
24 PW_LOG_INFO("Received: %c", result.value());
25 }
26
27 return pw::async2::Ready();
28 }
29
30 FakeUart& uart_;
31 std::optional<pw::async2::ValueFuture<pw::Result<char>>> future_;
32 };
33
34 ReaderTask reader_task(fake_uart);
35
36 // Post the task to the dispatcher to schedule it for execution.
37 dispatcher.Post(reader_task);
This example shows how to bridge the gap between low-level, interrupt-driven
hardware and the high-level, cooperative multitasking model of pw_async2.
Full source code for this example: pw_async2/examples/interrupt.cc
Unit testing#
Unit testing pw_async2 code is different from testing non-async code. You
must run async code from a Task on a
Dispatcher.
To test pw_async2 code:
Add a dependency on
//pw_async2:testing.Declare a pw::async2::DispatcherForTest.
Create a task to run the async code under test. Either implement Task or use PendFuncTask to wrap a lambda.
Post the task to the dispatcher.
Call RunUntilStalled to execute the task until it can make no further progress, or RunToCompletion if all tasks should complete.
The following example shows the basic structure of a pw_async2 unit test.
#include "pw_async2/context.h"
#include "pw_async2/dispatcher_for_test.h"
#include "pw_async2/pend_func_task.h"
#include "pw_unit_test/framework.h"
using ::pw::async2::Context;
using ::pw::async2::Ready;
namespace examples {
TEST(Async2UnitTest, MinimalExample) {
pw::async2::DispatcherForTest dispatcher;
// Create a test task to run the pw_async2 code under test.
pw::async2::PendFuncTask task([](Context&) { return Ready(); });
// Post and run the task on the dispatcher.
dispatcher.Post(task);
dispatcher.RunToCompletion();
}
} // namespace examples
It is usually necessary to run the test task multiple times to advance async code through its states. This improves coverage and ensures that wakers are stored and woken properly.
To run the test task multiple times:
Post the task to the dispatcher.
Call RunUntilStalled().
Perform actions to allow the task to advance.
Call RunUntilStalled() again.
Repeat until the task runs to completion, calling RunToCompletion() when the task should complete.
The example below runs a task multiple times to test waiting for a
FortuneTeller class to produce a fortune.
#include <utility>
#include "pw_async2/context.h"
#include "pw_async2/dispatcher.h"
#include "pw_async2/pend_func_task.h"
#include "pw_async2/try.h"
#include "pw_async2/value_future.h"
#include "pw_unit_test/framework.h"
using ::pw::async2::Context;
using ::pw::async2::Poll;
using ::pw::async2::Ready;
namespace examples {
// The class being tested.
class FortuneTeller {
public:
// Gets a fortune from the fortune teller.
pw::async2::ValueFuture<const char*> WaitForFortune() {
if (next_fortune_ != nullptr) {
return pw::async2::ValueFuture<const char*>::Resolved(
std::exchange(next_fortune_, nullptr));
}
return provider_.Get();
}
// Sets the next fortune to use and wakes a task waiting for one, if any.
void SetFortune(const char* fortune) {
if (provider_.has_future()) {
provider_.Resolve(fortune);
} else {
next_fortune_ = fortune;
}
}
private:
pw::async2::ValueProvider<const char*> provider_;
const char* next_fortune_ = nullptr;
};
TEST(Async2UnitTest, MultiStepExample) {
pw::async2::DispatcherForTest dispatcher;
FortuneTeller oracle;
const char* fortune = "";
std::optional<pw::async2::ValueFuture<const char*>> future;
// This task gets a fortune and checks that it matches the expected value.
// The task may need to execute multiple times if the fortune is not ready.
pw::async2::PendFuncTask task([&](Context& context) -> Poll<> {
if (!future.has_value()) {
future = oracle.WaitForFortune();
}
PW_TRY_READY_ASSIGN(fortune, future->Pend(context));
return Ready();
});
dispatcher.Post(task);
// The fortune hasn't been set, so the task should be pending.
EXPECT_TRUE(dispatcher.RunUntilStalled());
// Set the fortune, which wakes the pending task.
oracle.SetFortune("you will bring balance to the force");
// The task runs, gets the fortune, then returns Ready.
dispatcher.RunToCompletion();
// Ensure the fortune was set as expected.
EXPECT_STREQ(fortune, "you will bring balance to the force");
}
} // namespace examples
Full source code for this example: pw_async2/examples/unit_test.cc
Interacting with timers, delays, and timeouts#
Asynchronous systems often need to interact with time, for example to implement
timeouts, delays, or periodic tasks. pw_async2 provides a flexible and
testable mechanism for this through the TimeProvider interface. TimeProvider<SystemClock> is
commonly used when interacting with the system’s built-in time_point and
duration types.
TimeProvider allows for easily waiting for a timeout or deadline using the WaitFor and WaitUntil methods. Additionally, you can test code that uses TimeProvider for timing with simulated time using SimulatedTimeProvider. Doing so helps avoid timing-dependent test flakes and helps ensure that tests are fast since they don’t need to wait for real-world time to elapse.
TimeProvider, timer factory#
The TimeProvider is an abstract interface that acts as a factory for timers. Its key responsibilities are:
Providing the current time: The
now()method returns the current time according to a specific clock.Creating timers: The
WaitUntil(timestamp)andWaitFor(delay)methods return a TimeFuture object.
This design is friendly to dependency injection. By providing different
implementations of TimeProvider, code that uses timers can be tested with a
simulated clock (like pw::chrono::SimulatedClock), allowing for fast and
deterministic tests without real-world delays. For production code, the
GetSystemTimeProvider()
function returns a global TimeProvider that uses the configured system
clock.
TimeFuture, time-bound pendable objects#
A TimeFuture is a pendable object that
completes at a specific time. A task can Pend on a TimeFuture to
suspend itself until the time designated by the future. When the time is
reached, the TimeProvider wakes the task, and its next poll of the
TimeFuture will return Ready(timestamp).
Example#
Here is an example of a task that logs a message, sleeps for one second, and then logs another message.
#include "pw_async2/dispatcher.h"
#include "pw_async2/system_time_provider.h"
#include "pw_async2/task.h"
#include "pw_chrono/system_clock.h"
#include "pw_log/log.h"
using namespace std::chrono_literals;
class LoggingTask : public pw::async2::Task {
public:
LoggingTask() : state_(State::kLogFirstMessage) {}
private:
enum class State {
kLogFirstMessage,
kSleeping,
kLogSecondMessage,
kDone,
};
Poll<> DoPend(Context& cx) override {
while (true) {
switch (state_) {
case State::kLogFirstMessage:
PW_LOG_INFO("Hello, async world!");
future_ = GetSystemTimeProvider().WaitFor(1s);
state_ = State::kSleeping;
continue;
case State::kSleeping:
if (future_.Pend(cx).IsPending()) {
return Pending();
}
state_ = State::kLogSecondMessage;
continue;
case State::kLogSecondMessage:
PW_LOG_INFO("Goodbye, async world!");
state_ = State::kDone;
continue;
case State::kDone:
return Ready();
}
}
}
State state_;
pw::async2::TimeFuture<pw::chrono::SystemClock> future_;
};
Timing out Futures#
See Timing-out Futures.
Composing async operations with combinators#
Combinators allow for the composition of multiple async operations:
Poll aliases#
pw_async2 provides the following aliases to simplify common return types:
Alias |
Definition |
|---|---|
|
|
|