pw_async2#
Cooperative async tasks for embedded
Stable C++
pw_async2 is a cooperatively scheduled asynchronous framework for C++,
optimized for use in resource-constrained systems. It helps you write complex,
concurrent applications without the overhead of traditional preemptive
multithreading. The design prioritizes efficiency, minimal resource usage
(especially memory), and testability.
Benefits#
Simple Ownership: Say goodbye to that jumble of callbacks and shared state! Complex tasks with many concurrent elements can be expressed by simply combining smaller tasks.
Efficient: No dynamic memory allocation required.
Pluggable: Your existing event loop, work queue, or task scheduler can run the
Dispatcherwithout any extra threads.
Example#
Informed poll is the core design philosophy behind
pw_async2. A system consists of cooperatively scheduled Task implementations, which are run by a Dispatcher. Each task makes as much progress as possible when it
runs, yielding back to the dispatcher when it needs to wait for something. While
waiting, the dispatcher runs other tasks. Once a task is able to make progress,
the dispatcher runs it again.
Tasks are written as state machines which poll Futures to completion:
1#include "pw_async2/channel.h"
2#include "pw_async2/dispatcher_for_test.h"
3#include "pw_async2/poll.h"
4#include "pw_async2/try.h"
5#include "pw_log/log.h"
6
7namespace {
8
9using ::pw::async2::Context;
10using ::pw::async2::Poll;
11using ::pw::async2::Ready;
12using ::pw::async2::ReceiveFuture;
13using ::pw::async2::Receiver;
14using ::pw::async2::Sender;
15using ::pw::async2::SendFuture;
16using ::pw::async2::Task;
17
18// Receive then send that data asynchronously. If the receiver or sender
19// isn't ready, the task suspends when `PW_TRY_READY_ASSIGN` returns
20// `Pending()`.
21class ForwardingTask final : public Task {
22 public:
23 ForwardingTask(Receiver<int> receiver, Sender<int> sender)
24 : receiver_(std::move(receiver)),
25 sender_(std::move(sender)),
26 state_(kReceiving) {}
27
28 Poll<> DoPend(Context& cx) final {
29 receive_future_ = receiver_.Receive();
30
31 switch (state_) {
32 case kReceiving: {
33 PW_TRY_READY_ASSIGN(std::optional<int> new_data,
34 receive_future_.Pend(cx));
35 if (!new_data.has_value()) {
36 PW_LOG_ERROR("Receive failed: channel has closed");
37 return Ready(); // Completes the task.
38 }
39 // Start transmitting and switch to transmitting state.
40 send_future_ = sender_.Send(*new_data);
41 state_ = kTransmitting;
42 }
43 [[fallthrough]];
44 case kTransmitting: {
45 PW_TRY_READY_ASSIGN(bool sent, send_future_.Pend(cx));
46 if (!sent) {
47 PW_LOG_ERROR("Send failed: channel has closed");
48 }
49 return Ready(); // Completes the task.
50 }
51 }
52 }
53
54 private:
55 // Can receive data async, tracking state in a future.
56 Receiver<int> receiver_;
57 ReceiveFuture<int> receive_future_;
58 // Can send data async, tracking state in a future.
59 Sender<int> sender_;
60 SendFuture<int> send_future_;
61
62 enum State { kReceiving, kTransmitting };
63 State state_;
64};
65
66} // namespace
Tasks are added to a Dispatcher by calling Post():
1 ChannelStorage<int, 1> storage_a;
2 auto [handle_a, sender_a, receiver_a] = CreateSpscChannel(storage_a);
3 ChannelStorage<int, 1> storage_b;
4 auto [handle_b, sender_b, receiver_b] = CreateSpscChannel(storage_b);
5
6 ForwardingTask task(std::move(receiver_a), std::move(sender_b));
7
8 DispatcherForTest dispatcher;
9 // Registers `task` to run on the dispatcher.
10 dispatcher.Post(task);
11 // Runs the dispatcher until all `Post`ed tasks are blocked.
12 dispatcher.RunUntilStalled();