Pigweed
 
Loading...
Searching...
No Matches
once_sender.h
1// Copyright 2024 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include "pw_async2/dispatcher.h"
17#include "pw_async2/dispatcher_base.h"
18#include "pw_function/function.h"
19#include "pw_sync/mutex.h"
20
21namespace pw::async2 {
22
23// A lock guarding OnceReceiver and OnceSender member variables.
24//
25// This is an ``InterruptSpinLock`` in order to allow sending values from an
26// ISR context.
27inline pw::sync::InterruptSpinLock& sender_receiver_lock() {
28 static NoDestructor<pw::sync::InterruptSpinLock> lock;
29 return *lock;
30}
31
32template <typename T>
33class OnceSender;
34
40template <typename T>
41class OnceReceiver final {
42 public:
43 OnceReceiver() = default;
44
47 template <typename... Args>
48 explicit OnceReceiver(Args&&... value_args)
49 : value_(std::forward<Args>(value_args)...) {}
50
51 OnceReceiver(OnceReceiver&& other) {
52 std::lock_guard lock(sender_receiver_lock());
53 sender_ = other.sender_;
54 other.sender_ = nullptr;
55 if (sender_) {
56 sender_->receiver_ = this;
57 }
58 if (other.value_.has_value()) {
59 value_.emplace(std::move(other.value_.value()));
60 }
61 other.value_.reset();
62 waker_ = std::move(other.waker_);
63 }
64
65 OnceReceiver(const OnceReceiver&) = delete;
66 OnceReceiver& operator=(const OnceReceiver&) = delete;
67 OnceReceiver& operator=(OnceReceiver&& other) = delete;
68
69 ~OnceReceiver();
70
75 std::lock_guard lock(sender_receiver_lock());
76 if (value_.has_value()) {
77 return Ready(std::move(*value_));
78 } else if (!sender_) {
79 return Ready(Status::Cancelled());
80 }
81 PW_ASYNC_STORE_WAKER(
82 cx,
83 waker_,
84 "OnceReceiver is waiting for a value to be sent into the "
85 "corresponding OnceSender");
86 return Pending();
87 }
88
89 private:
90 template <typename U>
91 friend std::pair<OnceSender<U>, OnceReceiver<U>> MakeOnceSenderAndReceiver();
92 template <typename U>
93 friend void InitializeOnceSenderAndReceiver(OnceSender<U>& sender,
94 OnceReceiver<U>& receiver);
95 friend class OnceSender<T>;
96
97 OnceSender<T>* sender_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
98 std::optional<T> value_ PW_GUARDED_BY(sender_receiver_lock()) = std::nullopt;
99 Waker waker_;
100};
101
106template <typename T>
107class OnceSender final {
108 public:
109 OnceSender() = default;
110
111 ~OnceSender() {
112 std::lock_guard lock(sender_receiver_lock());
113 if (receiver_) {
114 std::move(receiver_->waker_).Wake();
115 receiver_->sender_ = nullptr;
116 }
117 }
118
119 OnceSender(OnceSender&& other) {
120 std::lock_guard lock(sender_receiver_lock());
121 receiver_ = other.receiver_;
122 other.receiver_ = nullptr;
123 if (receiver_) {
124 receiver_->sender_ = this;
125 }
126 }
127
128 OnceSender(const OnceSender&) = delete;
129 OnceSender& operator=(const OnceSender&) = delete;
130 OnceSender& operator=(OnceSender&& other) = delete;
131
133 template <typename... Args>
134 void emplace(Args&&... args) {
135 std::lock_guard lock(sender_receiver_lock());
136 if (receiver_) {
137 receiver_->value_.emplace(std::forward<Args>(args)...);
138 std::move(receiver_->waker_).Wake();
139 receiver_->sender_ = nullptr;
140 receiver_ = nullptr;
141 }
142 }
143
144 OnceSender& operator=(const T& value) {
145 emplace(value);
146 return *this;
147 }
148 OnceSender& operator=(T&& value) {
149 emplace(std::move(value));
150 return *this;
151 }
152
153 private:
154 template <typename U>
155 friend std::pair<OnceSender<U>, OnceReceiver<U>> MakeOnceSenderAndReceiver();
156 template <typename U>
157 friend void InitializeOnceSenderAndReceiver(OnceSender<U>& sender,
158 OnceReceiver<U>& receiver);
159 friend class OnceReceiver<T>;
160
161 OnceSender(OnceReceiver<T>* receiver) : receiver_(receiver) {}
162
163 OnceReceiver<T>* receiver_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
164};
165
166template <typename T>
167OnceReceiver<T>::~OnceReceiver() {
168 std::lock_guard lock(sender_receiver_lock());
169 if (sender_) {
170 sender_->receiver_ = nullptr;
171 }
172}
173
175template <typename T>
176std::pair<OnceSender<T>, OnceReceiver<T>> MakeOnceSenderAndReceiver() {
177 std::pair<OnceSender<T>, OnceReceiver<T>> send_recv;
178 InitializeOnceSenderAndReceiver(send_recv.first, send_recv.second);
179 return send_recv;
180}
181
183template <typename T>
184void InitializeOnceSenderAndReceiver(OnceSender<T>& sender,
185 OnceReceiver<T>& receiver)
186 PW_NO_LOCK_SAFETY_ANALYSIS {
187 // Disable lock analysis because these are fresh sender/receiver pairs and
188 // do not require a lock to initialize;
189 receiver.sender_ = &sender;
190 sender.receiver_ = &receiver;
191}
192
193template <typename T>
194class OnceRefSender;
195
203template <typename T>
204class OnceRefReceiver final {
205 public:
206 OnceRefReceiver() = default;
207
209 std::lock_guard lock(sender_receiver_lock());
210 sender_ = other.sender_;
211 other.sender_ = nullptr;
212 if (sender_) {
213 sender_->receiver_ = this;
214 }
215 value_ = other.value_;
216 waker_ = std::move(other.waker_);
217 }
218
219 OnceRefReceiver(const OnceRefReceiver&) = delete;
220 OnceRefReceiver& operator=(const OnceRefReceiver&) = delete;
221 OnceRefReceiver& operator=(OnceRefReceiver&& other) = delete;
222
224
229 std::lock_guard lock(sender_receiver_lock());
230 if (value_ == nullptr) {
231 return Ready(OkStatus());
232 }
233 if (sender_ == nullptr) {
234 return Ready(Status::Cancelled());
235 }
236 PW_ASYNC_STORE_WAKER(
237 cx,
238 waker_,
239 "OnceRefReceiver is waiting for OnceRefSender to write a value");
240 return Pending();
241 }
242
243 private:
244 template <typename U>
245 friend std::pair<OnceRefSender<U>, OnceRefReceiver<U>>
246 MakeOnceRefSenderAndReceiver(U&);
247 template <typename U>
248 friend void InitializeOnceRefSenderAndReceiver(OnceRefSender<U>& sender,
249 OnceRefReceiver<U>& receiver,
250 U& value);
251 friend class OnceRefSender<T>;
252
253 OnceRefReceiver(T& value) : value_(&value) {}
254
255 // Pointer to the value to be modified. Set to `nullptr` once modification
256 // is complete.
257 T* value_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
258 // Pointer to the modifier. Set to `nullptr` if the sender disappears.
259 OnceRefSender<T>* sender_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
260 Waker waker_;
261};
262
267template <typename T>
268class OnceRefSender final {
269 public:
270 OnceRefSender() = default;
271
273 std::lock_guard lock(sender_receiver_lock());
274 if (receiver_) {
275 receiver_->sender_ = nullptr;
276 std::move(receiver_->waker_).Wake();
277 }
278 }
279
281 std::lock_guard lock(sender_receiver_lock());
282 receiver_ = other.receiver_;
283 other.receiver_ = nullptr;
284 if (receiver_) {
285 receiver_->sender_ = this;
286 }
287 }
288
289 OnceRefSender(const OnceRefSender&) = delete;
290 OnceRefSender& operator=(const OnceRefSender&) = delete;
291 OnceRefSender& operator=(OnceRefSender&& other) = delete;
292
294 void Set(const T& value) {
295 std::lock_guard lock(sender_receiver_lock());
296 if (receiver_) {
297 *(receiver_->value_) = value;
298 std::move(receiver_->waker_).Wake();
299 receiver_->sender_ = nullptr;
300 receiver_->value_ = nullptr;
301 receiver_ = nullptr;
302 }
303 }
304
306 void Set(T&& value) {
307 std::lock_guard lock(sender_receiver_lock());
308 if (receiver_) {
309 *(receiver_->value_) = std::move(value);
310 std::move(receiver_->waker_).Wake();
311 receiver_->sender_ = nullptr;
312 receiver_->value_ = nullptr;
313 receiver_ = nullptr;
314 }
315 }
316
321 void ModifyUnsafe(pw::Function<void(T&)> func) {
322 std::lock_guard lock(sender_receiver_lock());
323 if (receiver_) {
324 // There is a risk of re-entrancy here if the user isn't careful.
325 func(*(receiver_->value_));
326 }
327 }
328
331 void Commit() {
332 std::lock_guard lock(sender_receiver_lock());
333 if (receiver_) {
334 std::move(receiver_->waker_).Wake();
335 receiver_->sender_ = nullptr;
336 receiver_->value_ = nullptr;
337 receiver_ = nullptr;
338 }
339 }
340
341 private:
342 template <typename U>
343 friend std::pair<OnceRefSender<U>, OnceRefReceiver<U>>
344 MakeOnceRefSenderAndReceiver(U&);
345 template <typename U>
346 friend void InitializeOnceRefSenderAndReceiver(OnceRefSender<U>& sender,
347 OnceRefReceiver<U>& receiver,
348 U& value);
349 friend class OnceRefReceiver<T>;
350
351 OnceRefSender(OnceRefReceiver<T>* receiver) : receiver_(receiver) {}
352
353 OnceRefReceiver<T>* receiver_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
354};
355
356template <typename T>
357OnceRefReceiver<T>::~OnceRefReceiver() {
358 std::lock_guard lock(sender_receiver_lock());
359 if (sender_) {
360 sender_->receiver_ = nullptr;
361 }
362}
363
368template <typename T>
369std::pair<OnceRefSender<T>, OnceRefReceiver<T>> MakeOnceRefSenderAndReceiver(
370 T& value) {
371 std::pair<OnceRefSender<T>, OnceRefReceiver<T>> send_recv;
372 InitializeOnceRefSenderAndReceiver(send_recv.first, send_recv.second, value);
373 return send_recv;
374}
375
380template <typename T>
381void InitializeOnceRefSenderAndReceiver(OnceRefSender<T>& sender,
382 OnceRefReceiver<T>& receiver,
383 T& value) PW_NO_LOCK_SAFETY_ANALYSIS {
384 // Disable lock analysis because these are fresh sender/receiver pairs and
385 // do not require a lock to initialize;
386 receiver.sender_ = &sender;
387 receiver.value_ = &value;
388 sender.receiver_ = &receiver;
389}
390
391} // namespace pw::async2
Definition: dispatcher_base.h:52
Definition: once_sender.h:41
friend std::pair< OnceSender< U >, OnceReceiver< U > > MakeOnceSenderAndReceiver()
Construct a pair of OnceSender and OnceReceiver.
Definition: once_sender.h:176
Poll< Result< T > > Pend(Context &cx)
Definition: once_sender.h:74
OnceReceiver(Args &&... value_args)
Definition: once_sender.h:48
Definition: once_sender.h:204
Poll< Status > Pend(Context &cx)
Definition: once_sender.h:228
Definition: once_sender.h:268
void Set(const T &value)
Copy assigns the reference and awakens the receiver.
Definition: once_sender.h:294
void Set(T &&value)
Move assigns the reference and awakens the receiver.
Definition: once_sender.h:306
void Commit()
Definition: once_sender.h:331
void ModifyUnsafe(pw::Function< void(T &)> func)
Definition: once_sender.h:321
Definition: once_sender.h:107
friend std::pair< OnceSender< U >, OnceReceiver< U > > MakeOnceSenderAndReceiver()
Construct a pair of OnceSender and OnceReceiver.
Definition: once_sender.h:176
void emplace(Args &&... args)
Construct the sent value in place and wake the OnceReceiver.
Definition: once_sender.h:134
Definition: poll.h:54
Definition: dispatcher_base.h:318
Definition: interrupt_spin_lock.h:48
fit::function_impl< function_internal::config::kInlineCallableSize, !function_internal::config::kEnableDynamicAllocation, FunctionType, PW_FUNCTION_DEFAULT_ALLOCATOR_TYPE > Function
Definition: function.h:74
constexpr Status OkStatus()
Definition: status.h:234