Why pw_async2?#
pw_async2: Cooperative async tasks for embedded
Embedded systems frequently need to manage multiple concurrent operations, such as handling network traffic, processing sensor data, and controlling actuators. Traditional asynchronous approaches like ad-hoc callbacks and multi-threading suffice for simple systems but introduce risks around deadlocks, memory usage, and complexity as the software grows.
Pigweed requires a concurrency model that scales to complex libraries like a
Bluetooth stack without sacrificing safety, ergonomics, or composition.
We adopted the Informed poll architecture to meet these
needs. This document explains why we built pw_async2 and provides a guide to
help you evaluate if it is the right fit for your project.
Pigweed’s async journey#
Pigweed’s support for asynchronous execution evolved over several years, driven by the increasing complexity of embedded applications.
Our first async-adjacent module was pw_work_queue, a simple utility for deferring work from interrupts to a dedicated thread. While it was explicitly designed to not be a general-purpose async solution, its availability led teams to begin using it for various types of async work, making it clear that we needed to provide something more structured for complex, multi-step operations.
To address this, we developed pw_async (v1). It introduced a
formal Dispatcher and Task model on top of a structured event loop, with
these interfaces being virtualized to allow for system-specific optimizations.
While this solved execution context issues and helped serialize work, each task
was still fundamentally a callback. As we built complex protocols like
Bluetooth, we found that chaining callbacks led to fragmented logic and complex
manual state tracking.
This experience led us to build pw_async2, centered on the
Informed poll model. Inspired primarily by Rust’s async
design, pw_async2 provides a cooperatively scheduled event loop for running
persistent tasks which suspend execution while blocked to allow other tasks to
run.
An explicit goal of the Rust-aligned design is to allow for interoperability
between C++ and Rust tasks as Rust becomes more widely used in embedded systems.
Compared to the other async models discussed below, the self-contained and
composable nature of pw_async2 tasks provides a convenient interface for
cross-language asynchronous operations. We already have early demos of Rust
futures running on the C++ dispatcher.
The pitfalls of traditional concurrency models#
Let’s consider a typical embedded transaction in a consumer electronics device. We read multiple sensors, perform some logic on the result, then forward it to a consumer. Specifically, the code needs to:
Sample two sensors in parallel, timing out the attempts after 10ms.
Send a packet containing the combined sensor data, with backpressure if the consumer cannot keep up.
Repeat this at a constant rate.
At any point, the device may signal that it is going to sleep. When this happens, all pending operations must be cancelled.
Note that this is more complex than a typical introductory example as it
involves interactions between different subsystems. The reason for this
complexity is that this is what real world products have to do. If this feels
more advanced than your use cases, you may not need pw_async2.
Ad-hoc callbacks#
In an ad-hoc callback model, asynchronous operations are triggered by hardware interrupts, timers, or background threads, invoking a provided callback function upon completion.
While this fire-and-forget style appears simple initially, it becomes difficult to manage as complexity grows. The following example demonstrates the implementation of the multi-sensor transaction described above using callbacks:
struct SensorData {
int value;
};
class CallbackSensor {
public:
void ReadAsync(pw::Function<void(pw::Result<SensorData>)>&& callback);
};
class CallbackTransmitter {
public:
void SendAsync(const SensorData& data,
pw::Function<void(pw::Status)>&& callback);
};
class CallbackTimer {
public:
void Schedule(pw::chrono::SystemClock::duration delay,
pw::Function<void()>&& callback);
void Cancel();
};
class CallbackSensorProcessor {
public:
CallbackSensorProcessor(CallbackSensor& s1,
CallbackSensor& s2,
CallbackTransmitter& tx,
CallbackTimer& timer)
: s1_(s1), s2_(s2), tx_(tx), timer_(timer) {}
~CallbackSensorProcessor() {
// Since there is no way to cancel an ongoing sensor read call, if the
// processor is destroyed while reads are pending, their callbacks can
// corrupt unused memory. We detect this by crashing, requiring users to
// remember to call Cancel() before destroying the processor.
//
// In practice, an object like this would be statically allocated for the
// lifetime of the program.
PW_CHECK(!cycle_active_);
Cancel();
}
// Begins the sensor read cycle.
void Start() { ScheduleNextCycle(); }
// Stops the sensor read cycle.
//
// WARNING: This does not cancel pending reads. Users must ensure that the
// processor is not destroyed until `cycle_active()` returns false.
//
// Returns true if there were no pending reads and the object is safe to
// destroy, or false if reads are still pending.
bool Cancel() {
std::lock_guard guard(lock_);
cancelled_ = true;
timer_.Cancel();
return !cycle_active_;
}
bool cycle_active() const {
std::lock_guard guard(lock_);
return cycle_active_;
}
// Called by the system to signal it is entering a low power mode.
void OnSleep() {
PW_LOG_INFO("Sensor processor entering sleep mode");
Cancel();
}
private:
enum class Sensor { kSensor1, kSensor2 };
void ScheduleNextCycle() {
bool proceed;
{
std::lock_guard guard(lock_);
cycle_active_ = true;
proceed = !cancelled_;
}
if (proceed) {
timer_.Schedule(std::chrono::milliseconds(100), [this]() { RunCycle(); });
}
{
std::lock_guard guard(lock_);
cycle_active_ = false;
}
}
void RunCycle() {
{
std::lock_guard guard(lock_);
if (cancelled_ || cycle_active_) {
return;
}
cycle_active_ = true;
state_.results_received = 0;
state_.timed_out = false;
}
timer_.Schedule(std::chrono::milliseconds(10), [this]() {
bool failed_to_receive = false;
{
// There is a race condition where the timeout and sensor callbacks may
// execute concurrently on separate threads or interrupt contexts, so
// we have to lock and track which operation finished first so that the
// later callback becomes a no-op.
std::lock_guard guard(lock_);
state_.timed_out = true;
if (state_.results_received < 2) {
cycle_active_ = false;
failed_to_receive = true;
}
}
if (failed_to_receive) {
PW_LOG_WARN("Timed out reading sensors");
}
});
// Depending on the behavior of the sensor driver, some additional
// considerations have to be made with these read calls.
//
// - If the driver starts a new hardware read on each `ReadAsync` call,
// we get duplicate interrupts/completions if operations overlap (e.g.,
// following a timeout), which can corrupt state on later cycles.
//
// - If the driver ignores a new read while one is pending, it may return
// data sampled in the past, causing timing uncertainty.
//
// These can be addressed by capturing a monotonic "read ID" in each
// callback, incremented on each cycle, and comparing the captured ID to
// the processor's current ID on response. However, `pw::Function` only
// has space to capture one pointer by default, so this would require
// globally increasing the storage size of every function in the system.
s1_.ReadAsync([this](pw::Result<SensorData> res) {
HandleSensorResult(res, Sensor::kSensor1);
});
s2_.ReadAsync([this](pw::Result<SensorData> res) {
HandleSensorResult(res, Sensor::kSensor2);
});
}
void HandleSensorResult(pw::Result<SensorData> res, Sensor sensor) {
bool should_proceed = false;
{
std::lock_guard guard(lock_);
if (state_.timed_out) {
// Stale callback: A late interrupt from a previous cycle may access
// repurposed state.
return;
}
if (sensor == Sensor::kSensor1) {
state_.r1 = res;
} else {
state_.r2 = res;
}
state_.results_received++;
if (state_.results_received == 2) {
should_proceed = true;
}
}
if (!res.ok()) {
PW_LOG_WARN("Failed to read sensor %d",
sensor == Sensor::kSensor1 ? 1 : 2);
}
if (should_proceed) {
ProceedToSend();
}
}
void ProceedToSend() {
// Lock inversion: We must not hold locks across external API calls
// (e.g. Cancel or SendAsync) to prevent deadlocks.
timer_.Cancel();
bool success = false;
SensorData combined;
{
std::lock_guard guard(lock_);
if (state_.r1.ok() && state_.r2.ok()) {
combined.value = state_.r1->value + state_.r2->value;
success = true;
}
}
if (!success) {
ScheduleNextCycle();
return;
}
tx_.SendAsync(combined, [this](pw::Status status) {
ScheduleNextCycle();
if (!status.ok()) {
// Handle send errors...
}
});
}
CallbackSensor& s1_;
CallbackSensor& s2_;
CallbackTransmitter& tx_;
CallbackTimer& timer_;
struct State {
int results_received = 0;
pw::Result<SensorData> r1;
pw::Result<SensorData> r2;
bool timed_out = false;
};
State state_ PW_GUARDED_BY(lock_);
bool cycle_active_ PW_GUARDED_BY(lock_) = false;
bool cancelled_ PW_GUARDED_BY(lock_) = false;
mutable pw::sync::InterruptSpinLock lock_;
};
This approach introduces several critical risks and complexities:
Manual state synchronization and race conditions. Because callbacks can execute in arbitrary contexts (e.g., hardware interrupts or timer threads), all shared state must be protected by explicit synchronization primitives. Additionally, the varied execution contexts make it hard to reason about what operations are safe to perform in callbacks. In the example, the
timeout_timer_callback and the sensor completion callbacks can execute concurrently, necessitating careful locking to avoid corrupting the sharedStatestructure.Lock inversion and deadlocks. To prevent deadlocks, locks must be tightly scoped. Invoking external callbacks or cancelling timers while holding synchronization locks risks recursive deadlocks or lock inversion. The developer must manually ensure that locks are released before invoking operations like
tx_.SendAsyncortimeout_timer_.Cancel(). Additionally, this complex locking comes with the cost of either keeping interrupts disabled (if using spinlocks), or blocking the calling thread when using mutexes, and mutexes on some common RTOSes are quite heavy in terms of CPU and memory.Implicit coupling. Callback-based APIs often force tight dependencies on the behavior of their backend (e.g. a driver). This ranges from the context in which the callback is run (ISR or thread?) to how multiple operations are queued, whether callbacks are permitted to call into the same APIs (risking recursive locking), whether cancellation is possible, and so on. These assumptions are not always spelled out in the API contract, but code often relies on them and can break if run against a different implementation.
Lifetime and cancellation fragility. There is no standard way to cancel an ongoing asynchronous operation in this model. If the
CallbackSensorProcessoris destroyed while reads are pending, their subsequent callbacks will access invalid memory. The destructor must assert that no cycles are active, placing the burden of lifetime management entirely on the user.
Structured work queues#
A common improvement over the ad-hoc callback approach is the use of a work queue which serializes callback execution on a single thread. These are widely and successfully used in many real products. Pigweed even ships its own implementation in pw_work_queue.
By guaranteeing that callbacks execute in a known context serially, the complex internal locking required for ad-hoc callbacks is eliminated. Compare the work queue-based solution to the previous example:
class WorkQueueSensorProcessor;
class WorkQueueSensor {
public:
enum class Id { kSensor1, kSensor2 };
WorkQueueSensor(Id id) : id_(id) {}
void ReadAsync(pw::work_queue::WorkQueue& wq);
private:
Id id_;
};
class WorkQueueTransmitter {
public:
void SendAsync(pw::work_queue::WorkQueue& wq, const SensorData&);
};
class WorkQueueTimer {
public:
enum class Type { kCycle, kTimeout };
void Schedule(pw::work_queue::WorkQueue& wq,
pw::chrono::SystemClock::duration,
Type type);
void Cancel() {}
};
// The WorkQueueSensorProcessor demonstrates callbacks being run from a work
// queue. This improves on ad-hoc callbacks by serializing execution on a
// centralized thread, removing the need for complex locking around internal
// state.
class WorkQueueSensorProcessor {
public:
WorkQueueSensorProcessor() = default;
void Init(WorkQueueSensor& s1,
WorkQueueSensor& s2,
WorkQueueTransmitter& tx,
WorkQueueTimer& timer,
pw::work_queue::WorkQueue& work_queue) {
s1_ = &s1;
s2_ = &s2;
tx_ = &tx;
timer_ = &timer;
work_queue_ = &work_queue;
}
~WorkQueueSensorProcessor() { Cancel(); }
void Start();
bool Cancel() {
// External signals like cancellation still arrive on arbitrary threads, and
// require explicit synchronization to coordinate with the work queue.
std::lock_guard guard(lock_);
cancelled_ = true;
if (timer_ != nullptr) {
timer_->Cancel();
}
return true;
}
void OnSleep() {
PW_LOG_INFO("Sensor processor entering sleep mode");
Cancel();
}
void RunCycle() {
{
std::lock_guard guard(lock_);
if (cancelled_) {
return;
}
}
// `RunCycle` and all of the callbacks below are invoked from the same work
// queue thread, so we can access `state_` without a lock.
state_.results_received = 0;
state_.timed_out = false;
timer_->Schedule(*work_queue_,
std::chrono::milliseconds(10),
WorkQueueTimer::Type::kTimeout);
s1_->ReadAsync(*work_queue_);
s2_->ReadAsync(*work_queue_);
}
void HandleSensorResult(pw::Result<SensorData> res,
WorkQueueSensor::Id sensor) {
if (state_.timed_out) {
return;
}
if (sensor == WorkQueueSensor::Id::kSensor1) {
state_.r1 = res;
} else {
state_.r2 = res;
}
state_.results_received++;
if (state_.results_received == 2) {
ProceedToSend();
}
}
void HandleTimeout() {
state_.timed_out = true;
if (state_.results_received < 2) {
PW_LOG_WARN("Timed out reading sensors");
ScheduleNext();
}
}
void ProceedToSend() {
timer_->Cancel();
if (!state_.r1.ok() || !state_.r2.ok()) {
ScheduleNext();
return;
}
SensorData combined;
combined.value = state_.r1->value + state_.r2->value;
tx_->SendAsync(*work_queue_, combined);
}
void HandleTransmitComplete(pw::Status status) {
if (!status.ok()) {
// Handle send errors...
}
ScheduleNext();
}
void ScheduleNext() {
timer_->Schedule(*work_queue_,
std::chrono::milliseconds(100),
WorkQueueTimer::Type::kCycle);
}
private:
WorkQueueSensor* s1_ = nullptr;
WorkQueueSensor* s2_ = nullptr;
WorkQueueTransmitter* tx_ = nullptr;
WorkQueueTimer* timer_ = nullptr;
pw::work_queue::WorkQueue* work_queue_ = nullptr;
struct State {
int results_received = 0;
pw::Result<SensorData> r1;
pw::Result<SensorData> r2;
bool timed_out = false;
};
State state_;
mutable pw::sync::Mutex lock_;
bool cancelled_ PW_GUARDED_BY(lock_) = false;
};
namespace {
WorkQueueSensorProcessor work_queue_processor;
} // namespace
void WorkQueueSensorProcessor::Start() {
work_queue_->CheckPushWork([]() { work_queue_processor.RunCycle(); });
}
void WorkQueueSensor::ReadAsync(pw::work_queue::WorkQueue& wq) {
wq.CheckPushWork([this]() {
work_queue_processor.HandleSensorResult(SensorData{42}, id_);
});
}
void WorkQueueTransmitter::SendAsync(pw::work_queue::WorkQueue& wq,
const SensorData&) {
wq.CheckPushWork(
[]() { work_queue_processor.HandleTransmitComplete(pw::OkStatus()); });
}
void WorkQueueTimer::Schedule(pw::work_queue::WorkQueue& wq,
pw::chrono::SystemClock::duration,
Type type) {
wq.CheckPushWork([type]() {
if (type == Type::kCycle) {
work_queue_processor.RunCycle();
} else {
work_queue_processor.HandleTimeout();
}
});
}
As you can see, the implementation becomes much more straightforward and readable. The mess of granular locking is gone, as each of the sensor processor’s operations runs on the same work queue.
However, this does not solve the ergonomic problems of callbacks, nor does it eliminate locking entirely. External signals (such as the system going to sleep) still require synchronization across thread boundaries, the execution flow remains fragmented and difficult to reason about or debug, and callbacks can still access invalid state if the object is destroyed.
Another implication in embedded systems in particular is that you typically
don’t have artibrary context captures for callbacks (à la std::function),
which means that async APIs need to be aware of the existence of the work queue
to know how to schedule their callbacks. This was not a problem with the ad-hoc
callback model as it would execute the callback directly.
Work queues also introduce some latency over ad-hoc callbacks because the callbacks are not executed immediately. Instead, they are added to a queue which is often shared by many parts of the system. Operations which are time-sensitive can execute less predictably.
Synchronous multi-threading#
Another common approach is to use a dedicated thread for the transaction, using blocking APIs for I/O operations. This model provides a linear, readable control flow via standard synchronous code, relying on the OS to handle concurrency.
Threads are often the default for many systems and for good reason. They are easy to write, easy to reason about, easy to debug, and universally supported. Almost all systems will have multiple threads running.
However, the simplicity of threads comes at a cost in resource-constrained systems. While having a set of core threads is typical, problems start to arise when you start spinning up new threads for an increasing number of concurrent operations.
The following example demonstrates the implementation using Pigweed’s pw_thread module with its generic thread creation API:
class BlockingSensor {
public:
pw::Result<SensorData> ReadBlocking(
pw::chrono::SystemClock::duration timeout);
};
class BlockingTransmitter {
public:
pw::Status SendBlocking(const SensorData& data);
};
class ThreadedSensorProcessor {
public:
ThreadedSensorProcessor() = default;
~ThreadedSensorProcessor() { Stop(); }
void Init(BlockingSensor& s1, BlockingSensor& s2, BlockingTransmitter& tx) {
s1_ = &s1;
s2_ = &s2;
tx_ = &tx;
}
void Start(const pw::thread::Options& options) {
std::lock_guard guard(lock_);
if (running_) {
return;
}
running_ = true;
cancelled_ = false;
thread_ = pw::Thread(options, [this]() { Run(); });
}
// Stops the processor and joins the thread.
void Stop() {
{
std::lock_guard guard(lock_);
if (!running_) {
return;
}
cancelled_ = true;
}
// If the worker thread is currently blocked on a timeout in `ReadBlocking`
// or waiting for backpressure in `SendBlocking`, this `join()` call will
// block the calling thread (whoever is requesting sleep) for the full
// duration.
thread_.join();
std::lock_guard guard(lock_);
running_ = false;
}
private:
void Run() {
while (true) {
{
std::lock_guard guard(lock_);
if (cancelled_)
break;
}
// NOTE: These reads are serialized. If we want to read the two sensors in
// parallel (as they are independent), that cannot be done with a single
// thread using blocking APIs. We would need to spin up additional worker
// threads, further increasing stack usage.
// Instead, we eat the latency cost of serialization, resulting in each
// sampling cycle taking up to twice the timeout.
// As noted in `Stop()`, if a sleep signal arrives while the thread is
// blocked, it has to wait for the operation to complete or time out.
auto r1 = s1_->ReadBlocking(std::chrono::milliseconds(10));
if (!r1.ok()) {
PW_LOG_WARN("Failed to read sensor 1");
continue;
}
auto r2 = s2_->ReadBlocking(std::chrono::milliseconds(10));
if (!r2.ok()) {
PW_LOG_WARN("Failed to read sensor 2");
continue;
}
SensorData combined;
combined.value = r1->value + r2->value;
// WARNING: If the transmitter is full, this blocks the thread until space
// becomes available, which is dependent on the wider system and network.
// Again, we are un-interruptible during this period.
pw::Status status = tx_->SendBlocking(combined);
if (!status.ok()) {
// Handle send errors...
}
pw::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
BlockingSensor* s1_ = nullptr;
BlockingSensor* s2_ = nullptr;
BlockingTransmitter* tx_ = nullptr;
pw::sync::Mutex lock_;
bool running_ PW_GUARDED_BY(lock_) = false;
bool cancelled_ PW_GUARDED_BY(lock_) = false;
pw::Thread thread_;
};
namespace {
// The most noticeable issue with the threading model is that each thread
// requires a dedicated stack, massively increasing the system's RAM usage.
constexpr pw::ThreadAttrs kSensorThreadAttrs =
pw::ThreadAttrs().set_name("SensorProcessor").set_stack_size_bytes(2048);
pw::ThreadContextFor<kSensorThreadAttrs> sensor_thread_context;
ThreadedSensorProcessor processor;
} // namespace
// Begins the sensor processing thread.
void RunThreadedSensorProcessor(BlockingSensor& s1,
BlockingSensor& s2,
BlockingTransmitter& tx) {
processor.Init(s1, s2, tx);
processor.Start(pw::GetThreadOptions(sensor_thread_context));
}
// Signals the sensor processing thread to stop.
void SleepThreadedSensorProcessor() { processor.Stop(); }
This code is notably simpler than any other example in this document, but it introduces some architectural trade-offs:
High overhead. Each thread requires its own dedicated stack. In the example, we allocate a 2KB stack for the sensor processor. In a system with many concurrent operations, committing a full stack per task quickly exhausts the limited RAM of a microcontroller, even though the thread may spend most of its time blocked waiting for I/O. Additionally, having more threads often leads to more resource contention, and mutexes are computationally expensive. Critical sections often become performance bottlenecks in products.
Resource vs. latency tradeoff. To keep resource usage low, we use a single thread that reads the sensors sequentially. If we wanted to read them in parallel to reduce latency, we would need to spawn additional threads, further compounding the RAM issue. Serializing the reads means the cycle time is the sum of both timeouts in the worst case.
Cancellation latency. Blocking I/O calls are inherently difficult to cancel cooperatively. When the system requests to sleep by calling
Stop(), the worker thread may be blocked inReadBlockingorSendBlocking. The calling thread must wait for the blocking operation to complete or time out beforejoin()returns, delaying the system’s transition to low-power mode.Manual synchronization is notoriously difficult. Almost anyone who has worked on a multi-threaded system has spent countless hours chasing down deadlocks, livelocks, priority inversions, and other concurrency bugs. The more threads you introduce, the greater the risk of these problems. Other async models handle this complexity internally so you don’t have to worry about it.
Events and message handling#
To avoid the complex lifetime management and synchronization issues of callbacks, some systems move to a higher level of abstraction using event and message handlers (often called the Active Object pattern). In this model, asynchronous operations post events to a centralized queue, and a state machine processes them sequentially.
This eliminates the need for locks around shared state since all processing happens in the same context. Event-based systems are easy to trace and easy to test, and provides a unified interface across the platform, making it more coherent.
The following example demonstrates an implementation of the sensor processor as an event-driven state machine:
enum class EventType {
kStartCycle,
kSensorData,
kSensorTimeout,
kTransmitComplete,
kSleepSignal,
};
enum class Sensor { kSensor1, kSensor2 };
struct Event {
EventType type;
Sensor sensor = Sensor::kSensor1;
pw::Result<SensorData> sensor_data = pw::Status::Unavailable();
pw::Status status = pw::OkStatus();
static Event StartCycle() { return {EventType::kStartCycle}; }
static Event SensorTimeout() { return {EventType::kSensorTimeout}; }
static Event Sleep() { return {EventType::kSleepSignal}; }
static Event SensorData(Sensor sensor, pw::Result<SensorData> res) {
return {EventType::kSensorData, sensor, res};
}
static Event TransmitComplete(pw::Status status) {
Event e;
e.type = EventType::kTransmitComplete;
e.status = status;
return e;
}
};
// All events in the system go through this centralized queue, which would be
// drained by a thread that forwards events into `HandleEvent`. The queue must
// outlive every possible asynchronous operation that posts events to it.
class EventQueue {
public:
void Post(Event event);
};
class EventSensorProcessor {
public:
EventSensorProcessor(EventQueue& queue,
CallbackSensor& s1,
CallbackSensor& s2,
CallbackTransmitter& tx,
CallbackTimer& timer)
: queue_(queue), s1_(s1), s2_(s2), tx_(tx), timer_(timer) {}
// Everything occurring in the system flows through this function as an event.
// This model avoids locks by serializing events onto a single thread.
// However, it suffers from severe logic fragmentation and loss of
// composability.
void HandleEvent(const Event& event) {
switch (state_) {
case State::kIdle:
HandleIdleEvent(event);
break;
case State::kWaitingForSensors:
HandleSensorsEvent(event);
break;
case State::kWaitingForTransmit:
HandleTransmitEvent(event);
break;
}
}
private:
enum class State {
kIdle,
kWaitingForSensors,
kWaitingForTransmit,
};
void HandleIdleEvent(const Event& event) {
switch (event.type) {
case EventType::kStartCycle:
StartCycle();
break;
// We use switch statements to ensure exhaustive handling, preventing bugs
// from forgetting to handle a new event type. However, this results in a
// massive increase in boilerplate and verbosity, as each state handler
// must explicitly enumerate all events, even if it ignores most of them.
case EventType::kSensorData:
case EventType::kSensorTimeout:
case EventType::kTransmitComplete:
case EventType::kSleepSignal:
// Ignored.
break;
}
}
void HandleSensorsEvent(const Event& event) {
switch (event.type) {
case EventType::kSensorTimeout:
// Say we wanted to extend the behavior to able to handle retrying
// before failing the cycle. Under this model, we could not easily
// compose it here. We would require new states, new events, and
// handling distributed across the entire class.
PW_LOG_WARN("Timed out reading sensors");
state_ = State::kIdle;
ScheduleNext();
break;
case EventType::kSleepSignal:
CancelAll();
state_ = State::kIdle;
break;
case EventType::kSensorData:
if (event.sensor == Sensor::kSensor1) {
r1_ = event.sensor_data;
} else {
r2_ = event.sensor_data;
}
results_received_++;
CheckSensorsComplete();
break;
case EventType::kStartCycle:
case EventType::kTransmitComplete:
// Ignored.
break;
}
}
void HandleTransmitEvent(const Event& event) {
switch (event.type) {
case EventType::kTransmitComplete:
if (!event.status.ok()) {
PW_LOG_WARN("Transmit failed");
}
state_ = State::kIdle;
ScheduleNext();
break;
case EventType::kSleepSignal:
// Under this model, we cannot easily cancel the pending `tx_.SendAsync`
// operation unless the transmitter driver explicitly provides a
// cancellation API. Instead, we transition to idle where the incoming
// event will be ignored.
state_ = State::kIdle;
break;
case EventType::kStartCycle:
case EventType::kSensorData:
case EventType::kSensorTimeout:
// Ignored.
break;
}
}
void StartCycle() {
state_ = State::kWaitingForSensors;
results_received_ = 0;
r1_ = pw::Status::Unavailable();
r2_ = pw::Status::Unavailable();
// Because callbacks post to the queue instead of directly modifying state,
// the synchronization and lifetime issues from the original callback
// example no longer apply.
timer_.Schedule(std::chrono::milliseconds(10),
[this]() { queue_.Post(Event::SensorTimeout()); });
s1_.ReadAsync([this](pw::Result<SensorData> res) {
queue_.Post(Event::SensorData(Sensor::kSensor1, res));
});
s2_.ReadAsync([this](pw::Result<SensorData> res) {
queue_.Post(Event::SensorData(Sensor::kSensor2, res));
});
}
void CheckSensorsComplete() {
if (results_received_ == 2) {
timer_.Cancel();
if (r1_.ok() && r2_.ok()) {
SensorData combined;
combined.value = r1_->value + r2_->value;
state_ = State::kWaitingForTransmit;
tx_.SendAsync(combined, [this](pw::Status status) {
queue_.Post(Event::TransmitComplete(status));
});
} else {
state_ = State::kIdle;
ScheduleNext();
}
}
}
void ScheduleNext() {
timer_.Schedule(std::chrono::milliseconds(100),
[this]() { queue_.Post(Event::StartCycle()); });
}
void CancelAll() { timer_.Cancel(); }
EventQueue& queue_;
CallbackSensor& s1_;
CallbackSensor& s2_;
CallbackTransmitter& tx_;
CallbackTimer& timer_;
State state_ = State::kIdle;
int results_received_ = 0;
pw::Result<SensorData> r1_;
pw::Result<SensorData> r2_;
};
Similar to work queues, the event model serializes processing and execution of asynchronous tasks, while additionally adding type safety and structured handling. However, it is not without its problems:
Event data bloat. Many events carry data, such as sensor readings in our example. In a system where events are global, there needs to be a centralized definition of an event type that can accommodate all payloads. This results in either a large, complex type that is difficult to work with, or erasure tricks like
void*which lose type safety.Queue sizing and contention. An event queue must either be conservatively sized to accommodate bursts of events, using extra memory, or the system must provide fallible or blocking APIs to the queue so that events can be dropped. The more widely used the event queue is, the worse this problem becomes.
State machine fragmentation. A simple linear flow (read sensors, then transmit) is spread across multiple state handler functions. Reading the code requires jumping between states to understand the sequence of operations. Unlike with callbacks, the way in which the results of operations arrive via events are also fragmented, as the handler for an event is decoupled from the API that produced it. This makes it difficult to reason about how and when events will arrive, which can lead to incomplete or incorrect state machines.
Limited extensibility and composability. When everything flows through a global dispatch table, that table needs to be aware of how to route to every possible async consumer, making it inherently coupled with the specifics of the system and difficult to extend or modularize. In particular, shared libraries are difficult to maintain as any modification to behavior requires coordination with each system’s dispatch table.
For example, say you wanted to add a delay in some shared module via a timer. Every user of that module would have to wire up the timer to route its completion event back into the library’s event handler. In our experience with real products using this model, the high engineering cost of doing the right thing often results in shortcuts such as blocking briefly.
The informed poll approach (pw_async2)#
The limitations of callbacks and threads led to the development of the informed poll architecture.
Informed polling separates when a task runs from what it does. Instead of blocking a thread or pushing callbacks, asynchronous work is represented as self-contained state machines that yield when they cannot make progress. A central dispatcher polls these state machines only when they are ready to advance.
The key to this model is the pw::async2::Waker. When a task yields while
waiting for an event, the event source acquires a Waker for that task. When
the event occurs, the source invokes the Waker to notify the dispatcher that
the task is ready to be polled again. This mechanism eliminates the need to poll
waiting tasks constantly and replaces callbacks with context-safe notifications.
The core poll model#
This foundational architecture allows tasks to achieve cooperative multitasking and explicit composition:
class Async2Sensor {
public:
pw::async2::ValueFuture<pw::Result<SensorData>> ReadAsync() {
return pw::async2::ValueFuture<pw::Result<SensorData>>::Resolved(
SensorData{42});
}
};
class Async2Transmitter {
public:
pw::async2::ValueFuture<pw::Status> SendAsync(const SensorData&) {
return pw::async2::ValueFuture<pw::Status>::Resolved(pw::OkStatus());
}
};
// The PolledSensorProcessor demonstrates the manual async2 Informed Poll model.
// All tasks run cooperatively on a shared dispatcher thread.
// Unlike callbacks, it requires no locks because the task owns its state and
// events are received via futures.
// Unlike threads, it does not require a dedicated stack for concurrency.
//
// However, writing this state machine manually can get quite complex and
// require boilerplate.
class PolledSensorProcessor : public pw::async2::Task {
public:
PolledSensorProcessor(Async2Sensor& s1,
Async2Sensor& s2,
pw::async2::Sender<SensorData> sender,
pw::async2::Notification& sleep_notification)
: pw::async2::Task(PW_ASYNC_TASK_NAME("PolledSensorProcessor")),
s1_(s1),
s2_(s2),
sender_(std::move(sender)),
sleep_notification_(sleep_notification) {}
private:
// Entrypoint the task. Every time it is woken and runs, this function is
// called from the top. We internally maintain our current state so we can
// pick up from where we left off.
pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
// At the top level, always check for a sleep notification first. If it has
// arrived, this short-circuits execution to exit the task.
if (!sleep_future_.is_pendable()) {
sleep_future_ = sleep_notification_.Wait();
}
if (sleep_future_.Pend(cx).IsReady()) {
// Sleep requested. We reset futures to ensure the operations no longer
// try to wake the task. There is no risk of dangling references or
// invalid memory access, as the future providers will no longer attempt
// to signal completion, and future destructors may additionally cancel
// the underlying operation, if that's supported.
//
// Note that this is done assuming that the task is statically allocated
// in some long-lived object. If the task was posted to the dispatcher
// dynamically via `PostShared`, this would not be required as the
// dispatcher would automatically destroy the task and its futures.
s1_future_ = {};
s2_future_ = {};
tx_future_ = {};
timer_future_ = {};
return pw::async2::Ready();
}
while (true) {
switch (state_) {
case State::kIdle: {
if (!timer_future_.is_pendable()) {
timer_future_ = pw::async2::GetSystemTimeProvider().WaitFor(
std::chrono::milliseconds(100));
}
// Wait for the timer to expire.
PW_AWAIT(timer_future_, cx);
// Begin a new cycle by starting the two sensor reads and a timeout.
state_ = State::kReadingSensors;
s1_future_ = s1_.ReadAsync();
s1_result_ = pw::Status::Unknown();
s2_future_ = s2_.ReadAsync();
s2_result_ = pw::Status::Unknown();
timer_future_ = pw::async2::GetSystemTimeProvider().WaitFor(
std::chrono::milliseconds(10));
break;
}
case State::kReadingSensors: {
if (timer_future_.Pend(cx).IsReady()) {
// Operation timed out. Cancel pending operations by resetting
// their futures.
s1_future_ = {};
s2_future_ = {};
PW_LOG_WARN("Timed out reading sensors");
state_ = State::kIdle;
break;
}
// By polling both futures separately, we achieve parallelism without
// the stack overhead of dedicated threads.
//
// Note that this is being done manually here to demonstrate how
// pw_async2 works internally. `pw::async2::Join` returns a
// `JoinFuture` which handles this operation for you, though it
// results in larger code size than writing it manually like this.
if (s1_result_.status().IsUnknown()) {
// Note that we don't use PW_AWAIT here because we always want to
// pend both futures.
auto r1 = s1_future_.Pend(cx);
if (r1.IsReady()) {
s1_result_ = r1.value();
}
}
if (s2_result_.status().IsUnknown()) {
auto r2 = s2_future_.Pend(cx);
if (r2.IsReady()) {
s2_result_ = r2.value();
}
}
if (s1_result_.status().IsUnknown() ||
s2_result_.status().IsUnknown()) {
// Wait for both sensor reads to complete.
return pw::async2::Pending();
}
// Cancel the timeout timer.
timer_future_ = {};
if (!s1_result_.ok() || !s2_result_.ok()) {
PW_LOG_WARN("Failed to read sensors");
state_ = State::kIdle;
break;
}
// Start the send operation with the result.
SensorData combined;
combined.value = s1_result_->value + s2_result_->value;
state_ = State::kTransmitting;
tx_future_ = sender_.Send(combined);
break;
}
case State::kTransmitting: {
PW_AWAIT(bool sent, tx_future_, cx);
if (!sent) {
// If the channel closes, exit the task since there is nothing
// further to do.
PW_LOG_ERROR("Channel closed while transmitting");
return pw::async2::Ready();
}
state_ = State::kIdle;
break;
}
}
}
}
enum class State {
kIdle,
kReadingSensors,
kTransmitting,
};
Async2Sensor& s1_;
Async2Sensor& s2_;
pw::async2::Sender<SensorData> sender_;
pw::async2::Notification& sleep_notification_;
State state_ = State::kIdle;
// Storing futures directly avoids the heap allocations often required
// by callback chains to preserve state across yield points.
pw::async2::ValueFuture<pw::Result<SensorData>> s1_future_;
pw::Result<SensorData> s1_result_;
pw::async2::ValueFuture<pw::Result<SensorData>> s2_future_;
pw::Result<SensorData> s2_result_;
pw::async2::SendFuture<SensorData> tx_future_;
pw::async2::TimeFuture<pw::chrono::SystemClock> timer_future_;
pw::async2::VoidFuture sleep_future_;
};
In this purely polled model:
Memory is bounded. Tasks do not need dedicated thread stacks. They only store the state needed to make progress within the object itself, giving known sizes at compile-time.
State is explicit. All variables that must survive across yield points (like futures and intermediate results) are stored as class members and accessed solely by the task. Ownership is never ambiguous.
Logic is contained. The sequence of operations performed by a task is an implementation detail of that task instead of being scattered across a set of callbacks or handlers.
Strong composability. Every async task and future is just an object with a
Pendfunction that returns aPoll<T>indicating if it is complete. This allows complex operations to be built out of small parts, and makes it easier to extract and reuse code without any strong global coupling.Everything lives on a single stack. Similarly to a thread, a task’s entrypoint is a regular function call. This allows for better use of traditional debug tooling compared to callback-based approaches.
However, manually writing these state machines introduces some boilerplate and complexity:
Manual coordination. Users must manually begin operations, poll for completion, and coordinate operations that need to run in parallel.
State proliferation. While explicit state provides several benefits, it is also a double-edged sword. The number of possible states and class members required to track them grows with the complexity of the task’s operations. In our example, we had to store several different futures and their results to persist them until other operations completed.
Developer unfamiliarity. Compared to traditional async paradigms, which are well understood by most developers, the informed polling model has an initial learning curve, and manual state machines are not the easiest code to read.
Operation lifecycle management. In the other async models described, when you call an async API, you know that it will automatically start and notify you when it is complete.
pw_async2’s tasks and futures are lazy, however, and must be polled to make progress. When writing manual polling code, it is easy to forget to poll some operations (e.g. short-circuiting aselectif the first operation is pending won’t advance the others).
Some of these issues are addressed by coroutines, described below, though they introduce their own set of tradeoffs.
Coroutines (C++20)#
To simplify writing async code, Pigweed provides adapters around its async2
polling model to work with C++20 coroutines. Coroutines allow you to write
sequential logic, using co_await to let the compiler synthesize the polled
state machine. Any async2 Future can directly be awaited from a coroutine.
By using high-level combinators like Join and Select, we can express
complex race conditions and parallel operations in a single declarative
statement:
pw::async2::Coro<void> CoroSensorProcessor(
pw::async2::CoroContext,
Async2Sensor& s1,
Async2Sensor& s2,
pw::async2::Sender<SensorData> sender,
pw::async2::Notification& sleep_notification) {
while (true) {
// Wait for the next cycle interval.
co_await pw::async2::GetSystemTimeProvider().WaitFor(
std::chrono::milliseconds(100));
// Wait for the first of three events: the sleep signal, both sensor reads,
// or a read timeout.
auto results = co_await pw::async2::Select(
sleep_notification.Wait(),
pw::async2::GetSystemTimeProvider().WaitFor(
std::chrono::milliseconds(10)),
pw::async2::Join(s1.ReadAsync(), s2.ReadAsync()));
if (results.template has_value<0>()) {
PW_LOG_INFO("Sensor processor entering sleep mode");
co_return;
}
if (results.template has_value<1>()) {
PW_LOG_WARN("Timed out reading sensors");
continue;
}
if (results.template has_value<2>()) {
auto& [r1, r2] = results.template value<2>();
if (r1.ok() && r2.ok()) {
SensorData combined;
combined.value = r1->value + r2->value;
// ``Send`` on an async channel blocks until there is space for the
// message, so backpressure is built in. However, this will stop the
// task from continuing to sample sensors. There are alternative
// approaches, such as continuing to poll sensors on a fixed interval
// while calling ``ReserveSend`` or ``TrySend`` on the channel to send
// the latest values when a slot is available.
bool sent = co_await sender.Send(combined);
if (!sent) {
PW_LOG_ERROR("Channel closed while transmitting");
co_return;
}
} else {
PW_LOG_WARN("Failed to read sensors");
}
}
}
}
Compared to manual polling, coroutines offer a significiant improvement in ergonomics, while maintaining the same efficiency and safety guarantees:
Sequential code style. Developers write code that reads synchronously but suspends execution cooperatively. There are no nested callbacks or complex state machines to maintain, no centralized event routers, and no concurrency issues.
Clear ownership. A coroutine task inherently owns its futures. If the task is destroyed or the futures go out of scope, any pending asynchronous operations it was waiting on are automatically cancelled. No manual lifecycle management is required.
However, coroutines come with a notable tradeoff for embedded systems in terms of memory.
Dynamic allocation. All coroutine frames are allocated to persist state across suspension points, with sizes determined by the compiler. In our experience, compilers are still not great at optimizing this. Pigweed’s coroutine wrappers use a pw::Allocator, so you at least have control over the pools of memory used.
High code size. Using powerful combinators like
SelectandJoingenerates complex template instantiations that can significantly increase binary size.Loss of debug context. Unlike the informed polling model they’re built on, the compiler-generated coroutine state machines and execution context wraps coroutines in complex template code which debuggers don’t always handle well.
In practice, many systems may prefer to mix coroutines with manual pw_async
state machines to balance these constraints.
Concurrency tradeoffs#
This comparison summarizes the high-level characteristics of each approach across several axes:
Ergonomics: How easy is the code to write and understand?
Memory usage: How much memory (RAM and flash) does the approach require?
Debuggability: How well does it work with standard debugging tools?
Extensibility: How easy is it to add to or change the functionality of an existing system?
Portability: How well can the code be shared between systems or projects?
Model |
Ergonomics |
Memory |
Debugging |
Extensibility |
Portability |
Use Case |
|---|---|---|---|---|---|---|
Ad-hoc callbacks |
🔴 Poor |
🟢 Low |
🔴 Poor |
🟡 Fair |
🔴 Poor |
Simple fire-and-forget IRQs |
Work queues |
🟡 Fair |
🟢 Low |
🟡 Fair |
🟡 Fair |
🟡 Fair |
Serialized background tasks |
Blocking threads |
🟢 Excellent |
🔴 High |
🟢 Excellent |
🟢 Good |
🟢 Good |
Resource-rich systems |
Event handlers |
🟡 Fair |
🟢 Low |
🟡 Fair |
🔴 Poor |
🔴 Poor |
Centralized event routing |
Informed poll (core) |
🟡 Fair |
🟢 Low |
🟡 Fair |
🟢 Good |
🟢 Good |
Zero-allocation state machines |
Coroutines (optional) |
🟢 Excellent |
🟡 Medium |
🔴 Poor |
🟢 Good |
🟢 Good |
Ergonomic async for larger systems |
Selecting the right tool#
Each of the concurrency models compared has its place in a system, and many real products use multiple models depending on their needs. The list below serves as a rough guide for when to use each one.
Ad-hoc callbacks: Best for simple, fire-and-forget interrupt handlers and one-off operations that are not part of a larger sequence.
Structured work queues: Good for serializing simple background tasks onto a single thread to avoid race conditions without complex locking, but break down as the number of steps in an operation grows.
Synchronous multi-threading: Best for simple, linear control flows where debugging visibility is a priority, RAM is plentiful, and concurrency density is low. Limited primarily by system resources.
Event and message handlers: Good for centralizing state tracking and simplifying lifetime management, but struggle with increased scope and scale.
Informed Poll (pw_async2): Good for complex, high-density concurrency and strong composability, with minimal memory and resource use when writing tasks manually, at the cost of verbosity and an initial learning curve.
Evaluating pw_async2 for your project#
While pw_async2 was built to solve specific architectural bottlenecks in
Pigweed, projects should evaluate it based on the value it brings to their
application. If you have to support the following, pw_async2 might be a
good fit:
Many asynchronous operations. If you have a lot of operations running at once that spend much of their time blocked on external resources, multiplexing them on a single dispatcher thread can save a considerable amount of memory.
Complex multi-step operations. When operations involve more than a few steps and require coordinating different resources, having them structured as self-contained tasks simplifies writing, testing, and debugging.
Composability. If your system consists of different components, having common interfaces to async operations enables code sharing and improves maintainability.
Application-level visibility. In many embedded deployments, the underlying RTOS scheduler is a black box. Because the
pw_async2dispatcher lives in the application layer, you own the scheduler’s behavior. You can instrument it, profile task execution times, and trace transitions using standard application logging.
pw_async2 is designed for incremental adoption. One you have a dispatcher
running, code can be migrated over to async tasks piece by piece, with
pw_async2-provided utilities such as channels to bridge the gap.
If you’re interested in learning more about pw_async2, check out our
Informed poll doc for a detailed overview of the model,
or jump into the Codelab to get started with
some hands-on experience.