C/C++ API Reference
Loading...
Searching...
No Matches
dispatcher.h
1// Copyright 2025 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include <atomic>
17#include <mutex>
18#include <type_traits>
19#include <utility>
20
21#include "pw_allocator/allocator.h"
22#include "pw_allocator/shared_ptr.h"
23#include "pw_async2/func_task.h"
24#include "pw_async2/future_task.h"
25#include "pw_async2/internal/lock.h"
26#include "pw_async2/task.h"
27#include "pw_async2/waker.h"
28#include "pw_containers/intrusive_queue.h"
29#include "pw_sync/lock_annotations.h"
30
31// Coroutines are supported if the build target depends on //pw_async2:coro.
32#if defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
33#include <functional> // std::invoke
34
35#include "pw_async2/coro.h"
36#include "pw_async2/coro_task.h"
37#include "pw_async2/fallible_coro_task.h"
38#endif // defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
39
40namespace pw::async2 {
41
43
75 public:
76 Dispatcher(const Dispatcher&) = delete;
77 Dispatcher& operator=(const Dispatcher&) = delete;
78
79 Dispatcher(Dispatcher&&) = delete;
80 Dispatcher& operator=(Dispatcher&&) = delete;
81
82 virtual ~Dispatcher() PW_LOCKS_EXCLUDED(internal::lock());
83
92 void Terminate() PW_LOCKS_EXCLUDED(internal::lock());
93
103 void Post(Task& task) PW_LOCKS_EXCLUDED(internal::lock());
104
110 template <typename T>
111 void PostShared(const SharedPtr<T>& task)
112 PW_LOCKS_EXCLUDED(internal::lock()) {
113 PW_ASSERT(PostAllocatedTask(task));
114 }
115
121 template <typename TaskType,
122 typename... Args,
123 typename = std::enable_if_t<std::is_base_of_v<Task, TaskType>>>
124 [[nodiscard]] SharedPtr<TaskType> Post(Allocator& allocator, Args&&... args) {
125 auto task = allocator.MakeShared<TaskType>(std::forward<Args>(args)...);
126 if (!PostAllocatedTask(task)) {
127 return nullptr;
128 }
129 return task;
130 }
131
142 template <
143 typename Func = void,
144 int&... kExplicitGuard,
145 typename Arg,
146 typename ActualFunc =
147 std::conditional_t<std::is_void_v<Func>, std::decay_t<Arg>, Func>,
148 typename = std::enable_if_t<!std::is_base_of_v<Task, ActualFunc>>>
150 Arg&& func) {
151 return Post<FuncTask<ActualFunc>>(allocator, std::forward<Arg>(func));
152 }
153
165 template <typename Func>
167 Func&& func) {
168 return Post<RunOnceTask<Func>>(allocator, std::forward<Func>(func));
169 }
170
181 template <typename Fut>
183 Fut&& future) {
184 return Post<FutureTask<Fut>>(allocator, std::forward<Fut>(future));
185 }
186
187#if defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
196 template <typename T>
197 [[nodiscard]] SharedPtr<CoroTask<T>> Post(Allocator& allocator,
198 Coro<T>&& coro) {
199 if (!coro.ok()) {
200 return nullptr;
201 }
202 return Post<CoroTask<T>>(allocator, std::move(coro));
203 }
204
214 template <typename T,
215 typename E = void,
216 int&... kExplicitGuard,
217 typename Arg,
218 typename ErrorHandler =
219 std::conditional_t<std::is_void_v<E>, std::decay_t<Arg>, E>>
220 [[nodiscard]] SharedPtr<FallibleCoroTask<T, ErrorHandler>> Post(
221 Allocator& allocator, Coro<T>&& coro, Arg&& error_handler) {
222 if (!coro.ok()) {
223 return nullptr;
224 }
225 return Post<FallibleCoroTask<T, ErrorHandler>>(
226 allocator, std::move(coro), std::forward<Arg>(error_handler));
227 }
228
240 template <auto kCoroFunc, typename... Args>
241 [[nodiscard]] auto Post(CoroContext coro_cx, Args&&... args) {
242 if constexpr (std::is_member_function_pointer_v<decltype(kCoroFunc)>) {
243 return PostSharedMemberCoro<kCoroFunc>(coro_cx,
244 std::forward<Args>(args)...);
245 } else {
246 return Post(coro_cx.allocator(),
247 std::invoke(kCoroFunc, coro_cx, std::forward<Args>(args)...));
248 }
249 }
250#endif // defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
251
254 void LogRegisteredTasks() PW_LOCKS_EXCLUDED(internal::lock());
255
256 protected:
257 constexpr Dispatcher() = default;
258
268
273 Task* PopTaskToRun() PW_LOCKS_EXCLUDED(internal::lock()) {
274 std::lock_guard lock(internal::lock());
275 return PopTaskToRunLocked();
276 }
277
290 Task* PopTaskToRun(bool& has_posted_tasks)
291 PW_LOCKS_EXCLUDED(internal::lock()) {
292 std::lock_guard lock(internal::lock());
293 Task* task = PopTaskToRunLocked();
294 has_posted_tasks = task != nullptr || !sleeping_.empty();
295 return task;
296 }
297
302 std::lock_guard lock(internal::lock());
303 wants_wake_ = true;
304 return PopTaskToRunLocked();
305 }
306
314 RunTaskResult RunTask(Task& task) PW_LOCKS_EXCLUDED(internal::lock()) {
315 return task.RunInDispatcher();
316 }
317
318 private:
319 friend class Task;
320 friend class Waker;
321
322 // Allow DispatcherForTestFacade to wrap another dispatcher (call Do*).
323 template <typename>
324 friend class DispatcherForTestFacade;
325
340 virtual void DoWake() PW_LOCKS_EXCLUDED(internal::lock()) = 0;
341
342 void Wake(Task* task_to_release = nullptr)
343 PW_UNLOCK_FUNCTION(internal::lock());
344
345 // Removes a task, waking the dispatcher if it is the last task.
346 void DeregisterTask(Task& task) PW_UNLOCK_FUNCTION(internal::lock()) {
347 if (has_tasks()) {
348 task.UnpostAndReleaseRef();
349 } else {
350 Wake(&task);
351 }
352 }
353
354 Task* PopTaskToRunLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock());
355
356 static void UnpostTaskList(IntrusiveQueue<Task>& list)
357 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock());
358
359 void RemoveWokenTaskLocked(Task& task)
360 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock()) {
361 woken_.remove(task);
362 }
363 void RemoveSleepingTaskLocked(Task& task)
364 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock()) {
365 sleeping_.remove(task);
366 }
367 void AddSleepingTaskLocked(Task& task)
368 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock()) {
369 sleeping_.push_front(task);
370 }
371 void AddWokenTaskLocked(Task& task)
372 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock()) {
373 woken_.push_back(task);
374 }
375
376 // True if any tasks are registered with the dispatcher.
377 bool has_tasks() const PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock()) {
378 return !woken_.empty() || !sleeping_.empty();
379 }
380
381 void LogTaskWakers(const Task& task)
382 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock());
383
384 // Checks if `task` contains a value and posts it to the dispatcher as an
385 // allocated task. Returns true if the task posted successfully.
386 template <typename T>
387 bool PostAllocatedTask(const SharedPtr<T>& task)
388 PW_LOCKS_EXCLUDED(internal::lock()) {
389 return PostAllocatedTask(
390 task.get(),
391 task.GetControlBlock(
392 allocator::internal::ControlBlockHandle::GetInstance_DO_NOT_USE()));
393 }
394
395 bool PostAllocatedTask(Task* task,
396 allocator::internal::ControlBlock* control_block)
397 PW_LOCKS_EXCLUDED(internal::lock());
398
399#if defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
400 template <auto kCoroMemberFunc, typename Receiver, typename... Args>
401 [[nodiscard]] auto PostSharedMemberCoro(CoroContext coro_cx,
402 Receiver&& receiver,
403 Args&&... args)
404 PW_LOCKS_EXCLUDED(internal::lock()) {
405 return Post(coro_cx.allocator(),
406 std::invoke(kCoroMemberFunc,
407 std::forward<Receiver>(receiver),
408 coro_cx,
409 std::forward<Args>(args)...));
410 }
411#endif // defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
412
413 IntrusiveQueue<Task> woken_ PW_GUARDED_BY(internal::lock());
414 IntrusiveQueue<Task> sleeping_ PW_GUARDED_BY(internal::lock());
415
416 // Counts pending DoWake() calls (really should only ever be 1). This is
417 // necessary to prevent the Dispatcher from being destroyed while DoWake() is
418 // running, since the lock is not held at that time.
419 std::atomic<uint8_t> wakes_pending_ = 0;
420
421 // Indicates that this Dispatcher should be woken when Wake() is called. This
422 // prevents unnecessary wakes when, for example, multiple wakers wake the same
423 // task or multiple tasks are posted before the dipsatcher runs.
424 bool wants_wake_ PW_GUARDED_BY(internal::lock()) = false;
425 bool terminated_ PW_GUARDED_BY(internal::lock()) = false;
426};
427
429
430} // namespace pw::async2
Definition: allocator.h:45
SharedPtr< T > MakeShared(Args &&... args)
Definition: allocator.h:229
Definition: intrusive_queue.h:30
Definition: shared_ptr.h:68
Definition: coro.h:556
Definition: dispatcher_for_test.h:36
Definition: dispatcher.h:74
Task * PopTaskToRun(bool &has_posted_tasks)
Definition: dispatcher.h:290
SharedPtr< FuncTask< ActualFunc > > Post(Allocator &allocator, Arg &&func)
Definition: dispatcher.h:149
SharedPtr< FutureTask< Fut > > PostFuture(Allocator &allocator, Fut &&future)
Definition: dispatcher.h:182
void PostShared(const SharedPtr< T > &task)
Definition: dispatcher.h:111
virtual void DoWake()=0
Task * PopSingleTaskForThisWake()
Definition: dispatcher.h:301
SharedPtr< TaskType > Post(Allocator &allocator, Args &&... args)
Definition: dispatcher.h:124
void Post(Task &task)
RunTaskResult RunTask(Task &task)
Definition: dispatcher.h:314
Task * PopTaskToRun()
Definition: dispatcher.h:273
SharedPtr< RunOnceTask< Func > > RunOnce(Allocator &allocator, Func &&func)
Definition: dispatcher.h:166
Definition: task.h:123
Definition: waker.h:155
RunTaskResult
Definition: task.h:83
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:147
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:249
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:178