Futures#
pw_async2: Cooperative async tasks for embedded
A Future is an object that represents the value of an asynchronous operation
which may not yet be complete. Upon completion, the future produces the result
of the operation, if it has one.
Futures are the core interface to pw_async2 asynchronous APIs.
Core concepts#
Futures operate using the
informed poll model on which
pw_async2 is built. This model is summarized below, but it is recommended to
read the full description for important background knowledge.
A Future<T> exposes two member functions:
Poll<T> Pend(Context& cx): Drives the asynchronous operation, returning its result on completion. After returningReady, the future cannot be polled again.bool is_complete(): Returns whether the future has already completed and had its result consumed. Can be called after the future returnsReady.
The base Future<T> class is an abstract interface. Specific asynchronous
operations return various concrete future types.
Ownership and lifetime#
Futures are owned by the caller of an asynchronous operation. The task that receives the future is responsible for storing and polling it.
The provider of a future must either outlive the future or arrange for the future to be resolved in an error state when the provider is destroyed.
Polling#
Futures are lazy and do nothing on their own. The task owning a future must poll
it to drive it to completion. Calling a future’s Pend function advances its
operation and returns a Poll containing one of two values:
Pending(): The asynchronous operation has not yet finished. The value is not available. The task polling the future is be scheduled to wake when the future can make additional progress.Typically, your task should propagate a
Pendingreturn upwards to notify the dispatcher that it is blocked and should sleep.Ready(T): The operation has completed, and the value is now available.
Once a future returns Ready, its state is final. Attempting to poll it again
results in an assertion.
This polling model allows a single thread to manage many concurrent operations without blocking.
Composability#
The power of futures is their ability to compose to construct complex asynchronous logic from smaller building blocks.
Futures can be classified into two categories: leaf futures and composite futures. Leaf futures represent a specific asynchronous operation, such as a read from a channel, or waiting for a timer. They contain the required state for their operations and manage the task waiting on them.
Composite futures are built on top of other futures, combining their results
to build advanced asynchronous execution graphs. For example, a Join future
waits for multiple other futures to complete, returning all of their results at
once. Composite futures can be used to express complex logic in a declarative
way.
Coroutine support#
Futures’ simple Pend API makes them easy to use with async2’s
coroutine adapter. You can co_await a
function that returns a future directly, automatically polling the future to
completion.
Working with futures#
Calling functions that return futures#
Consider some asynchronous call which produces a simple value on completion.
Pigweed provides ValueFuture<T> for this common case. The async function has
the following signature:
class NumberGenerator {
ValueFuture<int> GetNextNumber();
};
You would write a task that calls this operation as follows:
class MyTask : public pw::async2::Task {
private:
pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
// Obtain and store the future, then poll it to completion.
if (!future_.has_value()) {
future_.emplace(generator_.GetNextNumber());
}
PW_TRY_READY_ASSIGN(int number, future_->Pend(cx));
PW_LOG_INFO("Received number: %d", number);
return pw::async2::Ready();
}
NumberGenerator& generator_;
// The future is stored in an optional so it can be lazily initialized
// inside DoPend. Most concrete futures are not default constructible.
std::optional<ValueFuture<int>> future_;
};
pw::async2::Coro<pw::Status> MyCoroutineFunction(pw::async2::CoroContext&,
NumberGenerator& generator) {
// Pigweed's coroutine integration allows futures to be awaited directly.
int number = co_await generator.GetNextNumber();
PW_LOG_INFO("Received number: %d", number);
co_return pw::OkStatus();
}
Writing functions that return futures#
All future-based pw_async2 APIs have the signature
Future<T> DoThing(Args... args);
Where Future<T> is some concrete future implementation (e.g.
ValueFuture) which resolves to a value of type T and Args
represents any arguments to the operation.
When defining an asynchronous API, the function should always return a
Future directly — not a Result<Future> or
std::optional<Future>. If the operation is fallible, that should be
expressed by the future’s output, e.g. Future<Result<T>>.
This is necessary for proper composability. It makes using asynchronous APIs
consistent and enables higher-level futures which compose other futures to
function cleanly. Additionally, returning a Future directly is essential to
be able to work with coroutines: co_await can be used directly and will
resolve to a Result<T>.
Resolving futures#
After you vend a future from an asynchronous operation, you need a way to track and resolve it once the operation has completed. This is the role of providers.
Initially, all leaf futures in Pigweed are listable, allowing them to be stored in one of the following providers:
A ListFutureProvider allows multiple concurrent tasks to wait on an operation. The provider maintains a FIFO list of futures. When the operation completes, you can pop one (or more) futures from the list and resolve them.
A SingleFutureProvider only allows one task waiting on it at a time. It asserts if you vend a second future. Once the operation is complete, the future can be taken out and resolved.
Listable futures take their provider as a constructor argument and automatically manage their presence in the list.
Implementing a future#
While pw_async2 provides a suite of common futures and combinators, you
may sometimes need to implement a custom leaf future to represent a specific
asynchronous operation (e.g., waiting for a hardware interrupt).
The primary tool for this is the ListableFutureWithWaker base class.
ListableFutureWithWaker#
This class provides the essential machinery for most custom leaf futures:
It stores the Waker of the task that polls it.
It manages its membership in an intrusive list, allowing it to be tracked by a “provider”.
It tracks completion internally.
Waking mechanism#
When a task polls a future and it returns Pending, the future must store
the task’s Waker from the provided Context. This is handled
automatically by ListableFutureWithWaker.
On the other side of the asynchronous operation (e.g., in an interrupt handler),
when the operation completes, the provider is used to retrieve the future, and
its Wake() function is called. This notifies the dispatcher that the task
waiting on this future is ready to make progress and should be polled again.
Example: Waiting for a GPIO interrupt#
Below is an example of a custom future that waits for a GPIO button press using
interfaces from pw_digital_io.
1class ButtonReceiver;
2
3class ButtonFuture
4 : public pw::async2::ListableFutureWithWaker<ButtonFuture, void> {
5 public:
6 // Provide a descriptive reason which can be used to debug blocked tasks.
7 static constexpr const char kWaitReason[] = "Waiting for button press";
8
9 // You are required to implement move semantics for your custom future,
10 // and to call `Base::MoveFrom` to ensure the intrusive list is updated.
11 ButtonFuture(ButtonFuture&& other) : Base(Base::kMovedFrom) {
12 Base::MoveFrom(other);
13 }
14 ButtonFuture& operator=(ButtonFuture&& other) {
15 Base::MoveFrom(other);
16 return *this;
17 }
18
19 private:
20 using Base = pw::async2::ListableFutureWithWaker<ButtonFuture, void>;
21 friend Base;
22 friend class ButtonReceiver;
23
24 explicit ButtonFuture(
25 pw::async2::SingleFutureProvider<ButtonFuture>& provider)
26 : Base(provider) {}
27
28 void HandlePress() {
29 pressed_ = true;
30 Base::Wake();
31 }
32
33 pw::async2::Poll<> DoPend(pw::async2::Context&) {
34 if (pressed_) {
35 return pw::async2::Ready();
36 }
37 return pw::async2::Pending();
38 }
39
40 bool pressed_ = false;
41};
42
43class ButtonReceiver {
44 public:
45 explicit ButtonReceiver(pw::digital_io::DigitalInterrupt& line)
46 : line_(line) {
47 PW_CHECK_OK(line_.SetInterruptHandler(
48 pw::digital_io::InterruptTrigger::kActivatingEdge,
49 [this](pw::digital_io::State) { HandleInterrupt(); }));
50 PW_CHECK_OK(line_.EnableInterruptHandler());
51 }
52
53 // Returns a future that completes when the button is pressed.
54 ButtonFuture WaitForPress() {
55 PW_ASSERT(!provider_.has_future());
56 return ButtonFuture(provider_);
57 }
58
59 private:
60 // Executed in interrupt context.
61 // `SingleFutureProvider` is internally synchronized and interrupt-safe.
62 void HandleInterrupt() {
63 if (auto future = provider_.Take()) {
64 future->get().HandlePress();
65 }
66 }
67
68 pw::digital_io::DigitalInterrupt& line_;
69 pw::async2::SingleFutureProvider<ButtonFuture> provider_;
70};
This example demonstrates the core mechanics of creating a custom future.
This pattern of waiting for a single value from a producer is so common that
pw_async2 provides ValueFuture and ValueProvider to handle it.
In practice, you would return a VoidFuture (alias for ValueFuture<void>)
from WaitForPress instead of writing a custom ButtonFuture.
Combinators#
Combinators allow you to compose multiple futures into a single future to express complex control flow.
Join#
Join() waits for multiple futures to complete and returns a tuple of their results.
#include "pw_async2/join.h"
ValueFuture<pw::Status> DoWork(int id);
class JoinTask : public pw::async2::Task {
private:
pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
if (!future_.has_value()) {
// Start three futures concurrently and wait for all of them
// to complete.
future_.emplace(pw::async2::Join(DoWork(1), DoWork(2), DoWork(3)));
}
PW_TRY_READY_ASSIGN(auto results, future_->Pend(cx));
auto [status1, status2, status3] = *results;
if (!status1.ok() || !status2.ok() || !status3.ok()) {
PW_LOG_ERROR("Operation failed");
} else {
PW_LOG_INFO("All operations succeeded");
}
return pw::async2::Ready();
}
std::optional<JoinFuture<ValueFuture<pw::Status>,
ValueFuture<pw::Status>,
ValueFuture<pw::Status>>>
future_;
};
pw::async2::Coro<pw::Status> JoinExample(pw::async2::CoroContext&) {
// Start three futures concurrently and wait for all of them to complete.
auto [status1, status2, status3] =
co_await pw::async2::Join(DoWork(1), DoWork(2), DoWork(3));
if (!status1.ok() || !status2.ok() || !status3.ok()) {
PW_LOG_ERROR("Operation failed");
co_return pw::Status::Internal();
}
PW_LOG_INFO("All operations succeeded");
co_return pw::OkStatus();
}
Select#
Select() waits for the first of multiple futures to complete. It returns a SelectFuture which resolves to an OptionalTuple containing the result. If additional futures happen to complete between the first future completing the task re-running, the tuple stores all of their results.
#include "pw_async2/select.h"
ValueFuture<int> DoWork();
ValueFuture<int> DoOtherWork();
class SelectTask : public pw::async2::Task {
private:
pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
if (!future_.has_value()) {
// Race two futures and wait for the first one to complete.
future_.emplace(pw::async2::Select(DoWork(), DoOtherWork()));
}
PW_TRY_READY_ASSIGN(auto results, future_->Pend(cx));
// Check which future(s) completed.
// In this example, we check all of them, but it's common to return
// after the first result.
if (results.has_value<0>()) {
PW_LOG_INFO("DoWork completed with: %d", results.get<0>());
}
if (results.has_value<1>()) {
PW_LOG_INFO("DoOtherWork completed with: %d", results.get<1>());
}
return pw::async2::Ready();
}
std::optional<SelectFuture<ValueFuture<int>, ValueFuture<int>>> future_;
};
pw::async2::Coro<int> SelectExample(pw::async2::CoroContext&) {
// Race two futures and wait for the first one to complete.
auto results = co_await pw::async2::Select(DoWork(), DoOtherWork());
// Check which future(s) completed.
// In this example, we check all of them, but it's common to return
// after the first result.
if (results.has_value<0>()) {
int result = results.get<0>();
PW_LOG_INFO("DoWork completed with: %d", result);
}
if (results.has_value<1>()) {
int result = results.get<1>();
PW_LOG_INFO("DoOtherWork completed with: %d", result);
}
co_return pw::OkStatus();
}
Channels#
Channels are the primary mechanism for communicating between asynchronous tasks
in pw_async2. They provide a synchronized way to pass data between tasks.
A channel is a fixed-capacity queue that supports multiple senders and multiple receivers. Channels can be used between async tasks on the same dispatcher, between tasks on different dispatchers, or between tasks and non-async code. There are two types of channel: static channels, which have user-managed storage, and dynamic channels which are allocated and automatically manage their lifetimes.
Creating a channel#
Channels can be created in four configurations which control the number of senders and receivers supported. The “multi-” configurations support up to 255 producers or consumers.
Name |
Initials |
Max Producers |
Max Consumers |
|---|---|---|---|
Single-producer, single-consumer |
SPSC |
1 |
1 |
Single-producer, multi-consumer |
SPMC |
1 |
255 |
Multi-producer, single-consumer |
MPSC |
255 |
1 |
Multi-producer, multi-consumer |
MPMC |
255 |
255 |
Examples of creating each channel type are shown below.
// Single-producer, single-consumer
#include "pw_async2/channel.h"
// Create storage for a static channel with a capacity of 10 integers.
// The storage must outlive the channel for which it is used.
pw::async2::ChannelStorage<int, 10> storage;
// Create a channel using the storage.
//
// In this example, we create a single-producer, single-consumer channel and
// are given the sole sender and receiver.
auto [channel, sender, receiver] = pw::async2::CreateSpscChannel(storage);
// Hand the sender and receiver to various parts of the system.
MySenderTask sender_task(std::move(sender));
MyReceiverTask receiver_task(std::move(receiver));
// You can hold onto the channel handle if you want to use it to
// manually close the channel before all senders and receivers have
// completed.
//
// If you want the channel to close automatically once either end hangs
// up, you should `Release` the handle immediately to disassociate its
// reference to the channel.
channel.Release();
// Single-producer, multi-consumer
#include "pw_async2/channel.h"
// Create storage for a static channel with a capacity of 10 integers.
// The storage must outlive the channel for which it is used.
pw::async2::ChannelStorage<int, 10> storage;
// Create a channel using the storage.
//
// In this example, we create a single-producer, multi-consumer channel and
// are given the sole sender. Receivers are created from the channel handle.
auto [channel, sender] = pw::async2::CreateSpmcChannel(storage);
// Hand the sender and receiver to various parts of the system.
MySenderTask sender_task(std::move(sender));
MyReceiverTask receiver_task_1(channel.CreateReceiver());
MyReceiverTask receiver_task_2(channel.CreateReceiver());
// You can hold onto the channel handle if you want to use it to
// manually close the channel before all senders and receivers have
// completed.
//
// If you want the channel to close automatically once either end hangs
// up, you should `Release` the handle after all desired receivers are
// created to disassociate its reference to the channel.
channel.Release();
// Multi-producer, single-consumer
#include "pw_async2/channel.h"
// Create storage for a static channel with a capacity of 10 integers.
// The storage must outlive the channel for which it is used.
pw::async2::ChannelStorage<int, 10> storage;
// Create a channel using the storage.
//
// In this example, we create a multi-producer, single-consumer channel and
// are given the sole receiver. Senders are created from the channel handle.
auto [channel, receiver] = pw::async2::CreateMpscChannel(storage);
// Hand the sender and receiver to various parts of the system.
MySenderTask sender_task_1(channel.CreateSender());
MySenderTask sender_task_2(channel.CreateSender());
MyReceiverTask receiver_task(std::move(receiver));
// You can hold onto the channel handle if you want to use it to
// manually close the channel before all senders and receivers have
// completed.
//
// If you want the channel to close automatically once either end hangs
// up, you should `Release` the handle after all desired senders are
// created to disassociate its reference to the channel.
channel.Release();
// Multi-producer, multi-consumer
#include "pw_async2/channel.h"
// Create storage for a static channel with a capacity of 10 integers.
// The storage must outlive the channel for which it is used.
pw::async2::ChannelStorage<int, 10> storage;
// Create a channel using the storage.
//
// In this example, we create a multi-producer, multi-consumer channel.
// Both senders and receivers are created from the channel handle.
pw::async2::MpmcChannelHandle<int> channel =
pw::async2::CreateMpmcChannel(storage);
// Hand the sender and receiver to various parts of the system.
MySenderTask sender_task_1(channel.CreateSender());
MySenderTask sender_task_2(channel.CreateSender());
MyReceiverTask receiver_task_1(channel.CreateReceiver());
MyReceiverTask receiver_task_2(channel.CreateReceiver());
// You can hold onto the channel handle if you want to use it to
// manually close the channel before all senders and receivers have
// completed.
//
// If you want the channel to close automatically once either end hangs
// up, you should `Release` the handle after all desired senders and
// receivers are created to disassociate its reference to the channel.
channel.Release();
Channel handles#
Each channel creation function returns a handle to the channel. This handle is used for two operations:
Creating senders and receivers, if allowed by the channel configuration (
CreateSender,CreateReceiver).Forcefully closing the channel while senders and receivers are still active (
Close).
Handles are movable and copyable, so they can be given to any parts of the system which need to perform these operations.
As long as any handle to a channel is active, the channel will not automatically
close. If the system relies on the channel closing (for example, a receiving
task reading values until a std::nullopt), it is essential to Release
all handles once you are done creating senders/receivers from them.
Sending and receiving#
Senders and receivers provide asynchronous APIs for interacting with the channel.
Sender::Send(T value): Returns aFuture<bool>which resolves totruewhen the value has been written to the channel. If the channel is full, the future waits until space is available. If the channel closes, the future resolves tofalse.Receiver::Receive(): Returns aFuture<std::optional<T>>which waits until a value is available, or resolves tostd::nulloptif the channel is closed and empty.
1using pw::async2::Context;
2using pw::async2::Poll;
3using pw::async2::Ready;
4using pw::async2::ReceiveFuture;
5using pw::async2::Receiver;
6using pw::async2::Sender;
7using pw::async2::SendFuture;
8using pw::async2::Task;
9
10class Producer : public Task {
11 public:
12 explicit Producer(Sender<int>&& sender)
13 : Task(PW_ASYNC_TASK_NAME("Producer")), sender_(std::move(sender)) {}
14
15 private:
16 Poll<> DoPend(Context& cx) override {
17 while (data_ < 3) {
18 if (!send_future_.has_value()) {
19 send_future_.emplace(sender_.Send(data_));
20 }
21 PW_TRY_READY(send_future_->Pend(cx));
22 send_future_.reset();
23 ++data_;
24 }
25 sender_.Disconnect();
26 return Ready();
27 }
28
29 int data_ = 0;
30 Sender<int> sender_;
31 std::optional<SendFuture<int>> send_future_;
32};
33
34class Consumer : public Task {
35 public:
36 explicit Consumer(Receiver<int>&& receiver)
37 : Task(PW_ASYNC_TASK_NAME("Consumer")), receiver_(std::move(receiver)) {}
38
39 const pw::Vector<int>& values() const { return values_; }
40
41 private:
42 Poll<> DoPend(Context& cx) override {
43 while (true) {
44 if (!receive_future_.has_value()) {
45 receive_future_.emplace(receiver_.Receive());
46 }
47 PW_TRY_READY_ASSIGN(std::optional<int> result, receive_future_->Pend(cx));
48 if (!result.has_value()) {
49 break;
50 }
51
52 values_.push_back(*result);
53 receive_future_.reset();
54 }
55 receiver_.Disconnect();
56 return Ready();
57 }
58
59 Receiver<int> receiver_;
60 std::optional<ReceiveFuture<int>> receive_future_;
61 pw::Vector<int, 3> values_;
62};
1using pw::async2::Coro;
2using pw::async2::CoroContext;
3using pw::async2::Receiver;
4using pw::async2::Sender;
5
6Coro<pw::Status> CoroProducer(CoroContext&, Sender<int> sender) {
7 for (int data = 0; data < 3; ++data) {
8 co_await sender.Send(data);
9 }
10 co_return pw::OkStatus();
11}
12
13Coro<pw::Status> CoroConsumer(CoroContext&,
14 Receiver<int> receiver,
15 pw::Vector<int>& values) {
16 while (true) {
17 std::optional<int> result = co_await receiver.Receive();
18 if (!result.has_value()) {
19 break;
20 }
21 values.push_back(*result);
22 }
23 co_return pw::OkStatus();
24}
ReserveSend#
Sender::ReserveSend() is an alternative API for writing data to a channel.
Unlike the regular Send, which takes a value immediately and stages it in
its future, ReserveSend allows writing a value directly into the channel
once space is available. This can be useful for values which are expensive to
construct/move or rapidly changing. By waiting for a reservation, you can defer
capturing the value until you are guaranteed to be able to send it immediately.
ReserveSend returns a Future<std::optional<SendReservation<T>>>. The
SendReservation object is used to emplace a value directly into the channel.
If the reservation is dropped, it automatically releases the channel space.
If the channel closes, the future resolves to std::nullopt.
It is possible to use both Send and ReserveSend concurrently on the same
channel.
using pw::async2::ReserveSendFuture;
using pw::async2::Sender;
class ReservedSenderTask : public pw::async2::Task {
public:
explicit ReservedSenderTask(Sender<int>&& sender)
: sender(std::move(sender)) {}
private:
Poll<> DoPend(pw::async2::Context& cx) override {
// Reserve space for a value in the channel.
if (!reservation_future_.has_value()) {
reservation_future_ = sender.ReserveSend();
}
PW_TRY_READY_ASSIGN(auto reservation, reservation_future_);
if (!reservation.has_value()) {
PW_LOG_ERROR("Channel is closed");
return;
}
// Emplace a value into the channel.
reservation->Commit(42);
reservation_future_.reset();
return pw::async2::Ready();
}
Sender<int> sender;
std::optional<ReserveSendFuture<int>> reservation_future_;
};
using pw::async2::Coro;
using pw::async2::CoroContext;
using pw::async2::Sender;
Coro<Status> ReservedSenderExample(CoroContext&, Sender<int> sender) {
// Wait for space to become available.
auto reservation = co_await sender.ReserveSend();
if (!reservation.has_value()) {
PW_LOG_ERROR("Channel is closed");
co_return pw::Status::FailedPrecondition();
}
// Emplace a value into the channel.
reservation->Commit(42);
co_return pw::OkStatus();
}
Channel lifetime#
A channel remains open as long as it has at least one active sender and at least one active receiver.
If all receivers are destroyed, the channel closes. Subsequent
Sendattempts will fail (the future resolves tofalse).If all senders are destroyed, the channel closes. Subsequent
Receivecalls will drain any remaining items, then resolve tostd::nullopt.
Dynamic allocation#
In systems that have dynamic allocation, you can pass an Allocator and channel capacity to any of the channel creation functions to allocate a managed channel.
The dynamic functions wrap the returned tuple in a std::optional. If the
allocation fails, the optional will be empty.
#include "pw_async2/channel.h"
constexpr size_t kCapacity = 10;
auto result =
pw::async2::CreateSpscChannel<int>(GetSystemAllocator(), kCapacity);
if (!result.has_value()) {
PW_LOG_ERROR("Out of memory");
return;
}
// Hand the sender and receiver to various parts of the system.
auto&& [channel, sender, receiver] = *result;
// As with the static channel, release the handle once all desired senders
// and receivers are created unless you intend to use it to manually close
// the channel. As we created an SPSC channel here, there are no more senders
// or receivers so we release the handle immediately.
//
// The channel remains allocated and open as long as any senders, receivers,
// or futures to it are alive.
channel.Release();
Synchronous access#
If you need to write to a channel from a non-async context, such as a
separate thread or an interrupt handler, you can use TrySend.
Sender::TrySend(): Attempts to send the value immediately. Returns
trueif successful, orfalseif the channel is full or closed.Sender::TryReserveSend(): Attempts to reserve a slot in the channel immediately. Returns a
std::optional<SendReservation<T>>which contains a reservation if successful, orstd::nulloptif the channel is full or closed.Sender::BlockingSend(): Blocks the running thread until the value is sent or an optional timeout elapses. Returns a status indicating success or whether the channel is closed or the operation timed out.
Receiver::TryReceive(): Attempts to read a value from the channel immediately. Returns a
pw::Result<T>containing the value if successful, or an error if the channel is empty or closed.Receiver::BlockingReceive(): Blocks the running thread until a value is received or an optional timeout elapses. Returns a
pw::Result<T>containing either the value read or the error in case of timeout or channel closure.