pw_async#

Overview#

Pigweed’s async module provides portable APIs and utilities for writing asynchronous code. Currently, it provides:

  • Message loop APIs

Attention

This module is still under construction. The API is not yet stable.

Dispatcher#

Dispatcher is an API for a message loop that schedules and executes Tasks. See pw_async_basic for an example implementation.

Dispatcher is a pure virtual interface that is implemented by backends and FakeDispatcher. A virtual interface is used instead of a facade to allow substituting a FakeDispatcher for a Dispatcher backend in tests.

Dispatcher API#

class Dispatcher : public pw::chrono::VirtualSystemClock#

Abstract base class for an asynchronous dispatcher loop.

Dispatchers run many short, non-blocking units of work on a single thread. This approach has a number of advantages compared with executing concurrent tasks on separate threads:

  • Dispatchers can make more efficient use of system resources, since they don’t need to maintain separate thread stacks.

  • Dispatchers can run on systems without thread support, such as no-RTOS embedded environments.

  • Dispatchers allow tasks to communicate with one another without the synchronization overhead of locks, atomics, fences, or volatile.

Thread support: Dispatcher methods may be safely invoked from any thread, but the resulting tasks will always execute on a single thread. Whether or not methods may be invoked from interrupt context is implementation-defined.

VirtualSystemClock: Dispatcher implements VirtualSystemClock in order to provide a consistent source of (possibly mocked) time information to tasks.

A simple default dispatcher implementation is provided by pw_async_basic.

Subclassed by pw::async::BasicDispatcher, pw::async::FunctionDispatcher

Public Functions

inline virtual void Post(Task &task)#

Post caller-owned |task| to be run on the dispatch loop.

Posted tasks execute in the order they are posted. This ensures that tasks can re-post themselves and yield in order to allow other tasks the opportunity to execute.

A given |task| must only be posted to a single Dispatcher.

inline virtual void PostAfter(Task &task, chrono::SystemClock::duration delay)#

Post caller owned |task| to be run after |delay|.

If |task| was already posted to run at an earlier time (before |delay| would expire), |task| must be run at the earlier time, and |task| may also be run at the later time.

virtual void PostAt(Task &task, chrono::SystemClock::time_point time) = 0#

Post caller owned |task| to be run at |time|.

If |task| was already posted to run before |time|, |task| must be run at the earlier time, and |task| may also be run at the later time.

virtual bool Cancel(Task &task) = 0#

Prevent a Posted task from starting.

Returns: true: the task was successfully canceled and will not be run by the dispatcher until Posted again. false: the task could not be cancelled because it either was not posted, already ran, or is currently running on the Dispatcher thread.

Task API#

struct Context#

Contextual information provided by a Dispatcher to a running task.

Public Members

Dispatcher *dispatcher#

The Dispatcher running the current Task.

Task *task#

The current Task being executed.

using pw::async::TaskFunction = Function<void(Context&, Status)>#

A TaskFunction is a unit of work that is wrapped by a Task and executed on a Dispatcher.

TaskFunctions take a Context as their first argument. Before executing a Task, the Dispatcher sets the pointer to itself and to the Task in Context.

TaskFunctions take a Status as their second argument. When a Task is running as normal, |status| is PW_STATUS_OK. If a Task will not be able to run as scheduled, the Dispatcher will still invoke the TaskFunction with |status| PW_STATUS_CANCELLED. This provides an opportunity to reclaim resources held by the Task.

A Task will not run as scheduled if, for example, it is still waiting when the Dispatcher shuts down.

class Task#

A Task represents a unit of work (TaskFunction) that can be executed on a Dispatcher. To support various Dispatcher backends, it wraps a backend::NativeTask, which contains backend-specific state and methods.

Public Functions

inline Task()#

The default constructor creates a Task without a function. set_function() must be called before posting the Task.

inline explicit Task(TaskFunction &&f)#

Constructs a Task that calls f when executed on a Dispatcher.

inline void set_function(TaskFunction &&f)#

Configure the TaskFunction after construction. This MUST NOT be called while this Task is pending in a Dispatcher.

inline void operator()(Context &ctx, Status status)#

Executes this task.

inline backend::NativeTask &native_type()#

Returns the inner NativeTask containing backend-specific state. Only Dispatcher backends or non-portable code should call these methods!

Facade API#

Task#

The Task type represents a work item that can be submitted to and executed by a Dispatcher.

To run work on a Dispatcher event loop, a Task can be constructed from a function or lambda (see pw::async::TaskFunction) and submitted to run using the pw::async::Dispatcher::Post method (and its siblings, PostAt etc.).

The Task facade enables backends to provide custom storage containers for Task s, as well as to keep per- Task data alongside the TaskFunction (such as next pointers for intrusive linked-lists of Task).

The active Task backend is configured with the GN variable pw_async_TASK_BACKEND. The specified target must define a class pw::async::backend::NativeTask in the header pw_async_backend/task.h that meets the interface requirements in public/pw_async/task.h. Task will then trivially wrap NativeTask.

The bazel build provides the pw_async_task_backend label flag to configure the active Task backend.

FakeDispatcher#

The FakeDispatcher facade is a utility for simulating a real Dispatcher in tests. FakeDispatcher simulates time to allow for reliable, fast testing of code that uses Dispatcher. FakeDispatcher is a facade instead of a concrete implementation because it depends on Task state for processing tasks, which varies across Task backends.

The active FakeDispatcher backend is configured with the GN variable pw_async_FAKE_DISPATCHER_BACKEND. The specified target must define a class pw::async::test::backend::NativeFakeDispatcher in the header pw_async_backend/fake_dispatcher.h that meets the interface requirements in public/pw_async/task.h. FakeDispatcher will then trivially wrap NativeFakeDispatcher.

The bazel build provides the pw_async_fake_dispatcher_backend label flag to configure the FakeDispatcher backend.

Testing FakeDispatcher#

The GN template fake_dispatcher_tests in fake_dispatcher_tests.gni creates a test target that tests a FakeDispatcher backend. This enables one test suite to be shared across FakeDispatcher backends and ensures conformance.

FunctionDispatcher#

class FunctionDispatcher : public pw::async::Dispatcher#

FunctionDispatcher extends Dispatcher with Post*() methods that take a TaskFunction instead of a Task. This implies that Tasks are allocated or are taken from a Task pool. Tasks are owned and managed by the Dispatcher.

Subclassed by pw::async::HeapDispatcher

Public Functions

inline virtual Status Post(TaskFunction &&task_func)#

Post dispatcher owned |task_func| function.

inline virtual Status PostAfter(TaskFunction &&task_func, chrono::SystemClock::duration delay)#

Post dispatcher owned |task_func| function to be run after |delay|.

virtual Status PostAt(TaskFunction &&task_func, chrono::SystemClock::time_point time) = 0#

Post dispatcher owned |task_func| function to be run at |time|.

inline virtual void Post(Task &task)#

Post caller-owned |task| to be run on the dispatch loop.

Posted tasks execute in the order they are posted. This ensures that tasks can re-post themselves and yield in order to allow other tasks the opportunity to execute.

A given |task| must only be posted to a single Dispatcher.

inline virtual void PostAfter(Task &task, chrono::SystemClock::duration delay)#

Post caller owned |task| to be run after |delay|.

If |task| was already posted to run at an earlier time (before |delay| would expire), |task| must be run at the earlier time, and |task| may also be run at the later time.

virtual void PostAt(Task &task, chrono::SystemClock::time_point time) = 0#

Post caller owned |task| to be run at |time|.

If |task| was already posted to run before |time|, |task| must be run at the earlier time, and |task| may also be run at the later time.

HeapDispatcher#

class HeapDispatcher : public pw::async::FunctionDispatcher#

HeapDispatcher wraps an existing Dispatcher and allocates Task objects on the heap before posting them to the existing Dispatcher. After Tasks run, they are automatically freed.

Public Functions

virtual Status PostAt(TaskFunction &&task_func, chrono::SystemClock::time_point time) override#

Post dispatcher owned |task_func| function to be run at |time|.

inline virtual void PostAt(Task &task, chrono::SystemClock::time_point time) override#

Post caller owned |task| to be run at |time|.

If |task| was already posted to run before |time|, |task| must be run at the earlier time, and |task| may also be run at the later time.

inline virtual bool Cancel(Task &task) override#

Prevent a Posted task from starting.

Returns: true: the task was successfully canceled and will not be run by the dispatcher until Posted again. false: the task could not be cancelled because it either was not posted, already ran, or is currently running on the Dispatcher thread.

inline virtual chrono::SystemClock::time_point now() override#

Returns the current time.

Design#

Task Ownership#

Tasks are owned by clients rather than the Dispatcher. This avoids either memory allocation or queue size limits in Dispatcher implementations. However, care must be taken that clients do not destroy Tasks before they have been executed or canceled.

Getting Started#

First, configure the Task backend for the Dispatcher backend you will be using:

pw_async_TASK_BACKEND = "$dir_pw_async_basic:task"

Next, create an executable target that depends on the Dispatcher backend you want to use:

pw_executable("hello_world") {
  sources = [ "main.cc" ]
  deps = [ "$dir_pw_async_basic:dispatcher" ]
}

Next, instantiate the Dispatcher and post a task:

#include "pw_async_basic/dispatcher.h"

int main() {
  BasicDispatcher dispatcher;

  // Spawn a thread for the dispatcher to run on.
  thread::Thread work_thread(thread::stl::Options(), dispatcher);

  Task task([](pw::async::Context& ctx){
    printf("hello world\n");
    ctx.dispatcher->RequestStop();
  });

  // Execute `task` in 5 seconds.
  dispatcher.PostAfter(task, 5s);

  // Blocks until `task` runs.
  work_thread.join();
  return 0;
}

The above example runs the dispatcher on a new thread, but it can also run on the current/main thread:

#include "pw_async_basic/dispatcher.h"

int main() {
  BasicDispatcher dispatcher;

  Task task([](pw::async::Context& ctx){
    printf("hello world\n");
  });

  // Execute `task` in 5 seconds.
  dispatcher.PostAfter(task, 5s);

  dispatcher.Run();
  return 0;
}

Fake Dispatcher#

To test async code, FakeDispatcher should be dependency injected in place of Dispatcher. Then, time should be driven in unit tests using the Run*() methods. For convenience, you can use the test fixture FakeDispatcherFixture.

class FakeDispatcherFixture : public pw::unit_test::internal::Test#

Test fixture that is a simple wrapper around a FakeDispatcher.

Example:

using ExampleTest = pw::async::test::FakeDispatcherFixture;

TEST_F(ExampleTest, Example) {
  MyClass obj(dispatcher());

  obj.ScheduleSomeTasks();
  EXPECT_TRUE(RunUntilIdle());
  EXPECT_TRUE(some condition);

  obj.ScheduleTaskToRunIn30Seconds();
  EXPECT_TRUE(RunFor(30s));
  EXPECT_TRUE(task ran);
}

Public Functions

inline FakeDispatcher &dispatcher()#

Returns the FakeDispatcher that should be used for dependency injection.

inline chrono::SystemClock::time_point now()#

Returns the current fake time.

inline bool RunUntilIdle()#

Dispatches all tasks with due times up until now(). Returns true iff any tasks were invoked during the run.

inline bool RunUntil(chrono::SystemClock::time_point end_time)#

Dispatches all tasks with due times up to end_time, progressively advancing the fake clock. Returns true iff any tasks were invoked during the run.

inline bool RunFor(chrono::SystemClock::duration duration)#

Dispatches all tasks with due times up to now() + duration, progressively advancing the fake clock. Returns true iff any tasks were invoked during the run.

Attention

FakeDispatcher::now() will return the simulated time. Dispatcher::now() should therefore be used to get the current time in async code instead of other sources of time to ensure consistent time values and reliable tests.

Roadmap#

  • Stabilize Task cancellation API

  • Utility for dynamically allocated Tasks

  • CMake support

  • Support for C++20 coroutines