pw_async2#

Cooperative async tasks for embedded

Experimental C++17

  • Simple Ownership: Say goodbye to that jumble of callbacks and shared state! Complex tasks with many concurrent elements can be expressed by simply combining smaller tasks.

  • Efficient: No dynamic memory allocation required.

  • Pluggable: Your existing event loop, work queue, or task scheduler can run the Dispatcher without any extra threads.

  • Coroutine-capable: C++20 coroutines and Rust async fn work just like other tasks, and can easily plug into an existing pw_async2 systems.

pw::async2::Task is Pigweed’s async primitive. Task objects are cooperatively-scheduled “threads” which yield to the Dispatcher when waiting. When the Task is able to make progress, the Dispatcher will run it again. For example:

#include "pw_async2/dispatcher.h"
#include "pw_async2/poll.h"

#include "pw_result/result.h"

using ::pw::async2::Context;
using ::pw::async2::Poll;
using ::pw::async2::Ready;
using ::pw::async2::Pending;
using ::pw::async2::Task;

class ReceiveAndSend: public Task {
 public:
  ReceiveAndSend(Receiver receiver, Sender sender):
    receiver_(receiver), sender_(sender) {}

  Poll<> Pend(Context& cx) {
    if (!send_future_) {
      // ``PendReceive`` checks for available data or errors.
      //
      // If no data is available, it will grab a ``Waker`` from
      // ``cx.Waker()`` and return ``Pending``. When data arrives,
      // it will call ``waker.Wake()`` which tells the ``Dispatcher`` to
      // ``Pend`` this ``Task`` again.
      Poll<pw::Result<Data>> new_data = receiver_.PendReceive(cx);
      if (new_data.is_pending()) {
        // The ``Task`` is still waiting on data. Return ``Pending``,
        // yielding to the dispatcher. ``Pend`` will be called again when
        // data becomes available.
        return Pending();
      }
      if (!new_data->ok()) {
        PW_LOG_ERROR("Receiving failed: %s", data->status().str());
        // The ``Task`` completed;
        return Ready();
      }
      Data& data = **new_data;
      send_future_ = sender_.Send(std::move(data));
    }
    // ``PendSend`` attempts to send ``data_``, returning ``Pending`` if
    // ``sender_`` was not yet able to accept ``data_``.
    Poll<pw::Status> sent = send_future_.Pend(cx);
    if (sent.is_pending()) {
      return Pending();
    }
    if (!sent->ok()) {
      PW_LOG_ERROR("Sending failed: %s", sent->str());
    }
    return Ready();
  }
 private:
  Receiver receiver_;
  Sender sender_;

  // ``SendFuture`` is some type returned by `Sender::Send` that offers a
  // ``Pend`` method similar to the one on ``Task``.
  std::optional<SendFuture> send_future_ = std::nullopt;
};

Tasks can then be run on a Dispatcher using the Dispatcher::Post method:

#include "pw_async2/dispatcher.h"

int main() {
  ReceiveAndSendTask task(SomeMakeReceiverFn(), SomeMakeSenderFn());
  Dispatcher dispatcher;
  dispatcher.Post(task);
  dispatcher.RunUntilComplete(task);
  return 0;
}

Roadmap#

Coming soon: C++20 users can also define tasks using coroutines!

#include "pw_async2/dispatcher.h"
#include "pw_async2/poll.h"

#include "pw_result/result.h"

using ::pw::async2::CoroutineTask;

CoroutineTask ReceiveAndSend(Receiver receiver, Sender sender) {
  pw::Result<Data> data = co_await receiver.Receive(cx);
  if (!data.ok()) {
    PW_LOG_ERROR("Receiving failed: %s", data.status().str());
    return;
  }
  pw::Status sent = co_await sender.Send(std::move(data));
  if (!sent.ok()) {
    PW_LOG_ERROR("Sending failed: %s", sent.str());
  }
}

C++ API reference#

class Task#

A task which may complete one or more asynchronous operations.

The Task interface is commonly implemented by users wishing to schedule work on an asynchronous Dispatcher. To do this, users may subclass Task, providing an implementation of the DoPend method which advances the state of the Task as far as possible before yielding back to the Dispatcher.

This process works similarly to cooperatively-scheduled green threads or coroutines, with a Task representing a single logical “thread” of execution. Unlike some green thread or coroutine implementations, Task does not imply a separately-allocated stack: Task state is most commonly stored in fields of the Task subclass.

Once defined by a user, Task s may be run by passing them to a Dispatcher via Dispatcher::Post. The Dispatcher will then Pend the Task every time that the Task indicates it is able to make progress.

Note that Task objects must not be destroyed while they are actively being Pend’d by a Dispatcher. The best way to ensure this is to create Task objects that continue to live until they receive a DoDestroy call or which outlive their associated Dispatcher.

template<typename T = ReadyType>
class Poll#

Public Functions

Poll() = delete#

Basic constructors.

inline constexpr bool IsReady() const noexcept#

Returns whether or not this value is Ready.

inline constexpr T &value() & noexcept#

Returns the inner value.

This must only be called if IsReady() returned true.

inline constexpr const T *operator->() const noexcept#

Accesses the inner value.

This must only be called if IsReady() returned true.

inline constexpr const T &operator*() const & noexcept#

Returns the inner value.

This must only be called if IsReady() returned true.

inline constexpr Poll pw::async2::Ready()#

Returns a value indicating completion.

template<typename T, typename ...Args>
constexpr Poll<T> pw::async2::Ready(std::in_place_t, Args&&... args)#

Returns a value indicating completion with some result (constructed in-place).

template<typename T>
constexpr Poll<T> pw::async2::Ready(T &&value)#

Returns a value indicating completion with some result.

inline constexpr PendingType pw::async2::Pending()#

Returns a value indicating that an operation was not yet able to complete.

class Context#

Context for an asynchronous Task.

This object contains resources needed for scheduling asynchronous work, such as the current Dispatcher and the Waker for the current task.

Context s are most often created by Dispatcher s, which pass them into Task::Pend.

Public Functions

inline Context(Dispatcher &dispatcher, Waker &waker)#

Creates a new Context containing the currently-running Dispatcher and a Waker for the current Task.

inline Dispatcher &dispatcher()#

The Dispatcher on which the current Task is executing.

This can be used for spawning new tasks using dispatcher().Post(task);.

inline Waker &waker()#

Returns a Waker which, when awoken, will cause the current task to be Pend’d by its dispatcher.

class Waker#

An object which can respond to asynchronous events by queueing work to be done in response, such as placing a Task on a Dispatcher loop.

Waker s are often held by I/O objects, custom concurrency primitives, or interrupt handlers. Once the thing the Task was waiting for is available, Wake should be called so that the Task is alerted and may process the event.

Waker s may be held for any lifetime, and will be automatically nullified when the underlying Dispatcher or Task is deleted.

Waker s are most commonly created by Dispatcher s, which pass them into Task::Pend via its Context argument.

Public Functions

void Wake() &&#

Wakes up the Waker’s creator, alerting it that an asynchronous event has occurred that may allow it to make progress.

Wake operates on an rvalue reference (&&) in order to indicate that the event that was waited on has been complete. This makes it possible to track the outstanding events that may cause a Task to wake up and make progress.

Waker Clone(WaitReason reason) &#

Creates a second Waker from this Waker.

Clone is made explicit in order to allow for easier tracking of the different Wakers that may wake up a Task.

The WaitReason argument can be used to provide information about what event the Waker is waiting on. This can be useful for debugging purposes.

class Dispatcher : public pw::async2::DispatcherImpl<Dispatcher>#