C/C++ API Reference
Loading...
Searching...
No Matches
dispatcher.h
1// Copyright 2025 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include <atomic>
17#include <mutex>
18#include <type_traits>
19#include <utility>
20
21#include "pw_allocator/allocator.h"
22#include "pw_allocator/shared_ptr.h"
23#include "pw_async2/context.h"
24#include "pw_async2/func_task.h"
25#include "pw_async2/future_task.h"
26#include "pw_async2/internal/lock.h"
27#include "pw_async2/task.h"
28#include "pw_async2/waker.h"
29#include "pw_containers/intrusive_forward_list.h"
30#include "pw_sync/lock_annotations.h"
31
32// Coroutines are supported if the build target depends on //pw_async2:coro.
33#if defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
34#include <functional> // std::invoke
35
36#include "pw_async2/coro.h"
37#include "pw_async2/coro_task.h"
38#include "pw_async2/fallible_coro_task.h"
39#endif // defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
40
41namespace pw::async2 {
42
44
68 public:
69 Dispatcher(Dispatcher&) = delete;
70 Dispatcher& operator=(Dispatcher&) = delete;
71
72 Dispatcher(Dispatcher&&) = delete;
73 Dispatcher& operator=(Dispatcher&&) = delete;
74
77 virtual ~Dispatcher() PW_LOCKS_EXCLUDED(internal::lock()) { Destroy(); }
78
88 void Post(Task& task) PW_LOCKS_EXCLUDED(internal::lock()) {
89 {
90 std::lock_guard lock(internal::lock());
91 PostLocked(task);
92 }
93 Wake();
94 }
95
101 template <typename T>
102 void PostShared(const SharedPtr<T>& task)
103 PW_LOCKS_EXCLUDED(internal::lock()) {
104 PW_ASSERT(PostAllocatedTask(task));
105 }
106
112 template <typename TaskType,
113 typename... Args,
114 typename = std::enable_if_t<std::is_base_of_v<Task, TaskType>>>
115 [[nodiscard]] SharedPtr<TaskType> Post(Allocator& allocator, Args&&... args) {
116 auto task = allocator.MakeShared<TaskType>(std::forward<Args>(args)...);
117 if (!PostAllocatedTask(task)) {
118 return nullptr;
119 }
120 return task;
121 }
122
133 template <
134 typename Func = void,
135 int&... kExplicitGuard,
136 typename Arg,
137 typename ActualFunc =
138 std::conditional_t<std::is_void_v<Func>, std::decay_t<Arg>, Func>,
139 typename = std::enable_if_t<!std::is_base_of_v<Task, ActualFunc>>>
141 Arg&& func) {
142 return Post<FuncTask<ActualFunc>>(allocator, std::forward<Arg>(func));
143 }
144
156 template <typename Func>
158 Func&& func) {
159 return Post<RunOnceTask<Func>>(allocator, std::forward<Func>(func));
160 }
161
172 template <typename Fut>
174 Fut&& future) {
175 return Post<FutureTask<Fut>>(allocator, std::forward<Fut>(future));
176 }
177
178#if defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
187 template <typename T>
188 [[nodiscard]] SharedPtr<CoroTask<T>> Post(Allocator& allocator,
189 Coro<T>&& coro) {
190 if (!coro.ok()) {
191 return nullptr;
192 }
193 return Post<CoroTask<T>>(allocator, std::move(coro));
194 }
195
205 template <typename T,
206 typename E = void,
207 int&... kExplicitGuard,
208 typename Arg,
209 typename ErrorHandler =
210 std::conditional_t<std::is_void_v<E>, std::decay_t<Arg>, E>>
211 [[nodiscard]] SharedPtr<FallibleCoroTask<T, ErrorHandler>> Post(
212 Allocator& allocator, Coro<T>&& coro, Arg&& error_handler) {
213 if (!coro.ok()) {
214 return nullptr;
215 }
216 return Post<FallibleCoroTask<T, ErrorHandler>>(
217 allocator, std::move(coro), std::forward<Arg>(error_handler));
218 }
219
231 template <auto kCoroFunc, typename... Args>
232 [[nodiscard]] auto Post(CoroContext coro_cx, Args&&... args) {
233 if constexpr (std::is_member_function_pointer_v<decltype(kCoroFunc)>) {
234 return PostSharedMemberCoro<kCoroFunc>(coro_cx,
235 std::forward<Args>(args)...);
236 } else {
237 return Post(coro_cx.allocator(),
238 std::invoke(kCoroFunc, coro_cx, std::forward<Args>(args)...));
239 }
240 }
241#endif // defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
242
245 void LogRegisteredTasks() PW_LOCKS_EXCLUDED(internal::lock());
246
247 protected:
248 constexpr Dispatcher() = default;
249
259
264 Task* PopTaskToRun() PW_LOCKS_EXCLUDED(internal::lock()) {
265 std::lock_guard lock(internal::lock());
266 return PopTaskToRunLocked();
267 }
268
281 Task* PopTaskToRun(bool& has_posted_tasks)
282 PW_LOCKS_EXCLUDED(internal::lock()) {
283 std::lock_guard lock(internal::lock());
284 Task* task = PopTaskToRunLocked();
285 has_posted_tasks = task != nullptr || !sleeping_.empty();
286 return task;
287 }
288
293 std::lock_guard lock(internal::lock());
294 SetWantsWake();
295 return PopTaskToRunLocked();
296 }
297
305 RunTaskResult RunTask(Task& task) PW_LOCKS_EXCLUDED(internal::lock()) {
306 return task.RunInDispatcher();
307 }
308
309 private:
310 friend class Task;
311 friend class Waker;
312
313 // Allow DispatcherForTestFacade to wrap another dispatcher (call Do*).
314 template <typename>
315 friend class DispatcherForTestFacade;
316
317 // Posts a task, but does not wake the dispatcher, which is done after the
318 // lock is released.
319 void PostLocked(Task& task) PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock());
320
335 virtual void DoWake() PW_LOCKS_EXCLUDED(internal::lock()) = 0;
336
337 // Prefer to call Wake() without the lock held, but it can be called with it
338 // when necessary (see WakeTask()).
339 void Wake() {
340 if (wants_wake_.exchange(false, std::memory_order_relaxed)) {
341 DoWake();
342 }
343 }
344
345 Task* PopTaskToRunLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock());
346
347 // Removes references to this `Dispatcher` from all linked `Task`s and
348 // `Waker`s. Use a separate function so thread safety analysis applies.
349 void Destroy() PW_LOCKS_EXCLUDED(internal::lock());
350
351 static void UnpostTaskList(IntrusiveForwardList<Task>& list)
352 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock());
353
354 void RemoveWokenTaskLocked(Task& task)
355 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock()) {
356 woken_.remove(task);
357 }
358 void RemoveSleepingTaskLocked(Task& task)
359 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock()) {
360 sleeping_.remove(task);
361 }
362 void AddSleepingTaskLocked(Task& task)
363 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock()) {
364 sleeping_.push_front(task);
365 }
366
367 // For use by `Waker`.
368 void WakeTask(Task& task) PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock());
369
370 void LogTaskWakers(const Task& task)
371 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock());
372
373 // Indicates that this Dispatcher should be woken when Wake() is called. This
374 // prevents unnecessary wakes when, for example, multiple wakers wake the same
375 // task or multiple tasks are posted before the dipsatcher runs.
376 //
377 // Must be called while the lock is held to prevent missed wakes.
378 void SetWantsWake() PW_EXCLUSIVE_LOCKS_REQUIRED(internal::lock()) {
379 wants_wake_.store(true, std::memory_order_relaxed);
380 }
381
382 // Checks if `task` contains a value and posts it to the dispatcher as an
383 // allocated task. Returns true if the task posted successfully.
384 template <typename T>
385 bool PostAllocatedTask(const SharedPtr<T>& task)
386 PW_LOCKS_EXCLUDED(internal::lock()) {
387 return PostAllocatedTask(
388 task.get(),
389 task.GetControlBlock(
390 allocator::internal::ControlBlockHandle::GetInstance_DO_NOT_USE()));
391 }
392
393 bool PostAllocatedTask(Task* task,
394 allocator::internal::ControlBlock* control_block)
395 PW_LOCKS_EXCLUDED(internal::lock());
396
397#if defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
398 template <auto kCoroMemberFunc, typename Receiver, typename... Args>
399 [[nodiscard]] auto PostSharedMemberCoro(CoroContext coro_cx,
400 Receiver&& receiver,
401 Args&&... args) {
402 return Post(coro_cx.allocator(),
403 std::invoke(kCoroMemberFunc,
404 std::forward<Receiver>(receiver),
405 coro_cx,
406 std::forward<Args>(args)...));
407 }
408#endif // defined(__cpp_impl_coroutine) && __has_include("pw_async2/coro.h")
409
410 // TODO: b/491844340 - Evaluate IntrusiveForwardList performance and consider
411 // alternatives.
412 IntrusiveForwardList<Task> woken_ PW_GUARDED_BY(internal::lock());
413 IntrusiveForwardList<Task> sleeping_ PW_GUARDED_BY(internal::lock());
414
415 // Latches wake requests to avoid duplicate DoWake calls.
416 std::atomic<bool> wants_wake_ = false;
417};
418
420
421} // namespace pw::async2
Definition: allocator.h:45
SharedPtr< T > MakeShared(Args &&... args)
Definition: allocator.h:229
Definition: intrusive_forward_list.h:99
Definition: shared_ptr.h:68
Definition: coro.h:546
Definition: dispatcher_for_test.h:36
Definition: dispatcher.h:67
Task * PopTaskToRun(bool &has_posted_tasks)
Definition: dispatcher.h:281
SharedPtr< FuncTask< ActualFunc > > Post(Allocator &allocator, Arg &&func)
Definition: dispatcher.h:140
SharedPtr< FutureTask< Fut > > PostFuture(Allocator &allocator, Fut &&future)
Definition: dispatcher.h:173
void PostShared(const SharedPtr< T > &task)
Definition: dispatcher.h:102
virtual void DoWake()=0
Task * PopSingleTaskForThisWake()
Definition: dispatcher.h:292
virtual ~Dispatcher()
Definition: dispatcher.h:77
SharedPtr< TaskType > Post(Allocator &allocator, Args &&... args)
Definition: dispatcher.h:115
void Post(Task &task)
Definition: dispatcher.h:88
RunTaskResult RunTask(Task &task)
Definition: dispatcher.h:305
Task * PopTaskToRun()
Definition: dispatcher.h:264
SharedPtr< RunOnceTask< Func > > RunOnce(Allocator &allocator, Func &&func)
Definition: dispatcher.h:157
Definition: task.h:78
Definition: waker.h:159
RunTaskResult
Definition: task.h:38
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:176