Channels#
pw_async2: Cooperative async tasks for embedded
Channels are the primary mechanism for communicating between asynchronous tasks
in pw_async2. They provide a synchronized way to pass data between tasks.
A channel is a fixed-capacity queue that supports multiple senders and multiple receivers. Channels can be used between async tasks on the same dispatcher, between tasks on different dispatchers, or between tasks and non-async code. There are two types of channel: static channels, which have user-managed storage, and dynamic channels which are allocated and automatically manage their lifetimes.
Creating a channel#
Channels can be created in four configurations which control the number of senders and receivers supported. The “multi-” configurations support up to 255 producers or consumers.
Name |
Initials |
Max Producers |
Max Consumers |
|---|---|---|---|
Single-producer, single-consumer |
SPSC |
1 |
1 |
Single-producer, multi-consumer |
SPMC |
1 |
255 |
Multi-producer, single-consumer |
MPSC |
255 |
1 |
Multi-producer, multi-consumer |
MPMC |
255 |
255 |
Examples of creating each channel type are shown below.
// Single-producer, single-consumer
#include "pw_async2/channel.h"
// Create storage for a static channel with a capacity of 10 integers.
// The storage must outlive the channel for which it is used.
pw::async2::ChannelStorage<int, 10> storage;
// Create a channel using the storage.
//
// In this example, we create a single-producer, single-consumer channel and
// are given the sole sender and receiver.
auto [channel, sender, receiver] = pw::async2::CreateSpscChannel(storage);
// Hand the sender and receiver to various parts of the system.
MySenderTask sender_task(std::move(sender));
MyReceiverTask receiver_task(std::move(receiver));
// You can hold onto the channel handle if you want to use it to
// manually close the channel before all senders and receivers have
// completed.
//
// If you want the channel to close automatically once either end hangs
// up, you should `Release` the handle immediately to disassociate its
// reference to the channel.
channel.Release();
// Single-producer, multi-consumer
#include "pw_async2/channel.h"
// Create storage for a static channel with a capacity of 10 integers.
// The storage must outlive the channel for which it is used.
pw::async2::ChannelStorage<int, 10> storage;
// Create a channel using the storage.
//
// In this example, we create a single-producer, multi-consumer channel and
// are given the sole sender. Receivers are created from the channel handle.
auto [channel, sender] = pw::async2::CreateSpmcChannel(storage);
// Hand the sender and receiver to various parts of the system.
MySenderTask sender_task(std::move(sender));
MyReceiverTask receiver_task_1(channel.CreateReceiver());
MyReceiverTask receiver_task_2(channel.CreateReceiver());
// You can hold onto the channel handle if you want to use it to
// manually close the channel before all senders and receivers have
// completed.
//
// If you want the channel to close automatically once either end hangs
// up, you should `Release` the handle after all desired receivers are
// created to disassociate its reference to the channel.
channel.Release();
// Multi-producer, single-consumer
#include "pw_async2/channel.h"
// Create storage for a static channel with a capacity of 10 integers.
// The storage must outlive the channel for which it is used.
pw::async2::ChannelStorage<int, 10> storage;
// Create a channel using the storage.
//
// In this example, we create a multi-producer, single-consumer channel and
// are given the sole receiver. Senders are created from the channel handle.
auto [channel, receiver] = pw::async2::CreateMpscChannel(storage);
// Hand the sender and receiver to various parts of the system.
MySenderTask sender_task_1(channel.CreateSender());
MySenderTask sender_task_2(channel.CreateSender());
MyReceiverTask receiver_task(std::move(receiver));
// You can hold onto the channel handle if you want to use it to
// manually close the channel before all senders and receivers have
// completed.
//
// If you want the channel to close automatically once either end hangs
// up, you should `Release` the handle after all desired senders are
// created to disassociate its reference to the channel.
channel.Release();
// Multi-producer, multi-consumer
#include "pw_async2/channel.h"
// Create storage for a static channel with a capacity of 10 integers.
// The storage must outlive the channel for which it is used.
pw::async2::ChannelStorage<int, 10> storage;
// Create a channel using the storage.
//
// In this example, we create a multi-producer, multi-consumer channel.
// Both senders and receivers are created from the channel handle.
pw::async2::MpmcChannelHandle<int> channel =
pw::async2::CreateMpmcChannel(storage);
// Hand the sender and receiver to various parts of the system.
MySenderTask sender_task_1(channel.CreateSender());
MySenderTask sender_task_2(channel.CreateSender());
MyReceiverTask receiver_task_1(channel.CreateReceiver());
MyReceiverTask receiver_task_2(channel.CreateReceiver());
// You can hold onto the channel handle if you want to use it to
// manually close the channel before all senders and receivers have
// completed.
//
// If you want the channel to close automatically once either end hangs
// up, you should `Release` the handle after all desired senders and
// receivers are created to disassociate its reference to the channel.
channel.Release();
Channel handles#
Each channel creation function returns a handle to the channel. This handle is used for two operations:
Creating senders and receivers, if allowed by the channel configuration (
CreateSender,CreateReceiver).Forcefully closing the channel while senders and receivers are still active (
Close).
Handles are movable and copyable, so they can be given to any parts of the system which need to perform these operations.
As long as any handle to a channel is active, the channel will not automatically
close. If the system relies on the channel closing (for example, a receiving
task reading values until a std::nullopt), it is essential to Release
all handles once you are done creating senders/receivers from them.
Sending and receiving#
Senders and receivers provide asynchronous APIs for interacting with the channel.
Sender::Send(T value): Returns aFuture<bool>which resolves totruewhen the value has been written to the channel. If the channel is full, the future waits until space is available. If the channel closes, the future resolves tofalse.Receiver::Receive(): Returns aFuture<std::optional<T>>which waits until a value is available, or resolves tostd::nulloptif the channel is closed and empty.
1using pw::async2::Context;
2using pw::async2::Poll;
3using pw::async2::Ready;
4using pw::async2::ReceiveFuture;
5using pw::async2::Receiver;
6using pw::async2::Sender;
7using pw::async2::SendFuture;
8using pw::async2::Task;
9
10class Producer : public Task {
11 public:
12 explicit Producer(Sender<int>&& sender)
13 : Task(PW_ASYNC_TASK_NAME("Producer")), sender_(std::move(sender)) {}
14
15 private:
16 Poll<> DoPend(Context& cx) override {
17 while (data_ < 3) {
18 if (!send_future_.has_value()) {
19 send_future_.emplace(sender_.Send(data_));
20 }
21 PW_TRY_READY(send_future_->Pend(cx));
22 send_future_.reset();
23 ++data_;
24 }
25 sender_.Disconnect();
26 return Ready();
27 }
28
29 int data_ = 0;
30 Sender<int> sender_;
31 std::optional<SendFuture<int>> send_future_;
32};
33
34class Consumer : public Task {
35 public:
36 explicit Consumer(Receiver<int>&& receiver)
37 : Task(PW_ASYNC_TASK_NAME("Consumer")), receiver_(std::move(receiver)) {}
38
39 const pw::Vector<int>& values() const { return values_; }
40
41 private:
42 Poll<> DoPend(Context& cx) override {
43 while (true) {
44 if (!receive_future_.has_value()) {
45 receive_future_.emplace(receiver_.Receive());
46 }
47 PW_TRY_READY_ASSIGN(std::optional<int> result, receive_future_->Pend(cx));
48 if (!result.has_value()) {
49 break;
50 }
51
52 values_.push_back(*result);
53 receive_future_.reset();
54 }
55 receiver_.Disconnect();
56 return Ready();
57 }
58
59 Receiver<int> receiver_;
60 std::optional<ReceiveFuture<int>> receive_future_;
61 pw::Vector<int, 3> values_;
62};
1using pw::async2::Coro;
2using pw::async2::CoroContext;
3using pw::async2::Receiver;
4using pw::async2::Sender;
5
6Coro<pw::Status> CoroProducer(CoroContext&, Sender<int> sender) {
7 for (int data = 0; data < 3; ++data) {
8 co_await sender.Send(data);
9 }
10 co_return pw::OkStatus();
11}
12
13Coro<pw::Status> CoroConsumer(CoroContext&,
14 Receiver<int> receiver,
15 pw::Vector<int>& values) {
16 while (true) {
17 std::optional<int> result = co_await receiver.Receive();
18 if (!result.has_value()) {
19 break;
20 }
21 values.push_back(*result);
22 }
23 co_return pw::OkStatus();
24}
ReserveSend#
Sender::ReserveSend() is an alternative API for writing data to a channel.
Unlike the regular Send, which takes a value immediately and stages it in
its future, ReserveSend allows writing a value directly into the channel
once space is available. This can be useful for values which are expensive to
construct/move or rapidly changing. By waiting for a reservation, you can defer
capturing the value until you are guaranteed to be able to send it immediately.
ReserveSend returns a Future<std::optional<SendReservation<T>>>. The
SendReservation object is used to emplace a value directly into the channel.
If the reservation is dropped, it automatically releases the channel space.
If the channel closes, the future resolves to std::nullopt.
It is possible to use both Send and ReserveSend concurrently on the same
channel.
using pw::async2::ReserveSendFuture;
using pw::async2::Sender;
class ReservedSenderTask : public pw::async2::Task {
public:
explicit ReservedSenderTask(Sender<int>&& sender)
: sender(std::move(sender)) {}
private:
Poll<> DoPend(pw::async2::Context& cx) override {
// Reserve space for a value in the channel.
if (!reservation_future_.has_value()) {
reservation_future_ = sender.ReserveSend();
}
PW_TRY_READY_ASSIGN(auto reservation, reservation_future_);
if (!reservation.has_value()) {
PW_LOG_ERROR("Channel is closed");
return;
}
// Emplace a value into the channel.
reservation->Commit(42);
reservation_future_.reset();
return pw::async2::Ready();
}
Sender<int> sender;
std::optional<ReserveSendFuture<int>> reservation_future_;
};
using pw::async2::Coro;
using pw::async2::CoroContext;
using pw::async2::Sender;
Coro<Status> ReservedSenderExample(CoroContext&, Sender<int> sender) {
// Wait for space to become available.
auto reservation = co_await sender.ReserveSend();
if (!reservation.has_value()) {
PW_LOG_ERROR("Channel is closed");
co_return pw::Status::FailedPrecondition();
}
// Emplace a value into the channel.
reservation->Commit(42);
co_return pw::OkStatus();
}
Channel lifetime#
A channel remains open as long as it has at least one active sender and at least one active receiver.
If all receivers are destroyed, the channel closes. Subsequent
Sendattempts will fail (the future resolves tofalse).If all senders are destroyed, the channel closes. Subsequent
Receivecalls will drain any remaining items, then resolve tostd::nullopt.
Dynamic allocation#
In systems that have dynamic allocation, you can pass an Allocator and channel capacity to any of the channel creation functions to allocate a managed channel.
The dynamic functions wrap the returned tuple in a std::optional. If the
allocation fails, the optional will be empty.
#include "pw_async2/channel.h"
constexpr size_t kCapacity = 10;
auto result =
pw::async2::CreateSpscChannel<int>(GetSystemAllocator(), kCapacity);
if (!result.has_value()) {
PW_LOG_ERROR("Out of memory");
return;
}
// Hand the sender and receiver to various parts of the system.
auto&& [channel, sender, receiver] = *result;
// As with the static channel, release the handle once all desired senders
// and receivers are created unless you intend to use it to manually close
// the channel. As we created an SPSC channel here, there are no more senders
// or receivers so we release the handle immediately.
//
// The channel remains allocated and open as long as any senders, receivers,
// or futures to it are alive.
channel.Release();
Synchronous access#
If you need to write to a channel from a non-async context, such as a
separate thread or an interrupt handler, you can use TrySend.
Sender::TrySend(): Attempts to send the value immediately. Returns
trueif successful, orfalseif the channel is full or closed.Sender::TryReserveSend(): Attempts to reserve a slot in the channel immediately. Returns a
std::optional<SendReservation<T>>which contains a reservation if successful, orstd::nulloptif the channel is full or closed.Sender::BlockingSend(): Blocks the running thread until the value is sent or an optional timeout elapses. Returns a status indicating success or whether the channel is closed or the operation timed out.
Receiver::TryReceive(): Attempts to read a value from the channel immediately. Returns a
pw::Result<T>containing the value if successful, or an error if the channel is empty or closed.Receiver::BlockingReceive(): Blocks the running thread until a value is received or an optional timeout elapses. Returns a
pw::Result<T>containing either the value read or the error in case of timeout or channel closure.