C/C++ API Reference
Loading...
Searching...
No Matches
channel.h
1// Copyright 2025 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include <mutex>
17
18#include "pw_allocator/allocator.h"
19#include "pw_async2/callback_task.h"
20#include "pw_async2/dispatcher.h"
21#include "pw_async2/future.h"
22#include "pw_containers/deque.h"
23#include "pw_numeric/checked_arithmetic.h"
24#include "pw_result/result.h"
25#include "pw_sync/interrupt_spin_lock.h"
26#include "pw_sync/lock_annotations.h"
27#include "pw_sync/timed_thread_notification.h"
28
29namespace pw::async2 {
30
31template <typename T>
32class Receiver;
33
34template <typename T>
35class ReceiveFuture;
36
37template <typename T>
38class Sender;
39
40template <typename T>
41class SendFuture;
42
43template <typename T>
44class ReserveSendFuture;
45
46template <typename T>
47class SendReservation;
48
49template <typename T, uint16_t kCapacity>
50class ChannelStorage;
51
52namespace internal {
53
54template <typename T>
55class Channel;
56
57class BaseChannel;
58
59class BaseChannelFuture : public IntrusiveForwardList<BaseChannelFuture>::Item {
60 public:
61 BaseChannelFuture(const BaseChannelFuture&) = delete;
62 BaseChannelFuture& operator=(const BaseChannelFuture&) = delete;
63
64 // Derived classes call MoveAssignFrom to move rather than use the operator.
65 BaseChannelFuture& operator=(BaseChannelFuture&&) = delete;
66
68 [[nodiscard]] bool is_complete() const { return completed_; }
69
70 // Internal API for the channel to wake the future.
71 void Wake() { waker_.Wake(); }
72
73 protected:
74 // Creates a new future, storing nullptr if `channel` is nullptr or if the
75 // channel is closed.
76 explicit BaseChannelFuture(BaseChannel* channel) PW_LOCKS_EXCLUDED(*channel);
77
78 enum AllowClosed { kAllowClosed };
79
80 // Creates a new future, but does NOT check if the channel is open.
81 BaseChannelFuture(BaseChannel* channel, AllowClosed)
82 PW_LOCKS_EXCLUDED(*channel) {
83 StoreAndAddRefIfNonnull(channel);
84 }
85
86 BaseChannelFuture(BaseChannelFuture&& other)
87 PW_LOCKS_EXCLUDED(*channel_, *other.channel_)
88 : channel_(other.channel_) {
89 MoveFrom(other);
90 }
91
92 BaseChannelFuture& MoveAssignFrom(BaseChannelFuture& other)
93 PW_LOCKS_EXCLUDED(*channel_, *other.channel_);
94
95 // Unlists this future and removes a reference from the channel.
96 void RemoveFromChannel() PW_LOCKS_EXCLUDED(*channel_);
97
98 bool StoreWakerForReceiveIfOpen(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
99
100 void StoreWakerForSend(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
101
102 void StoreWakerForReserveSend(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
103
104 void MarkCompleted() { completed_ = true; }
105
106 void Complete() PW_UNLOCK_FUNCTION(*channel_);
107
108 BaseChannel* base_channel() PW_LOCK_RETURNED(channel_) { return channel_; }
109
110 private:
111 void StoreAndAddRefIfNonnull(BaseChannel* channel)
112 PW_LOCKS_EXCLUDED(*channel);
113
114 void MoveFrom(BaseChannelFuture& other) PW_LOCKS_EXCLUDED(*other.channel_);
115
116 BaseChannel* channel_;
117 Waker waker_;
118
119 bool completed_ = false;
120};
121
122// Adds Pend function and is_complete flag to BaseChannelFuture.
123template <typename Derived, typename T, typename FutureValue>
125 public:
126 using value_type = FutureValue;
127
128 Poll<value_type> Pend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
129 PW_ASSERT(!is_complete());
130 Poll<value_type> result = static_cast<Derived&>(*this).DoPend(cx);
131 if (result.IsReady()) {
132 MarkCompleted();
133 }
134 return result;
135 }
136
137 protected:
138 explicit ChannelFuture(Channel<T>* channel) : BaseChannelFuture(channel) {}
139
140 ChannelFuture(Channel<T>* channel, AllowClosed)
141 : BaseChannelFuture(channel, kAllowClosed) {}
142
143 ChannelFuture(ChannelFuture&& other) : BaseChannelFuture(std::move(other)) {}
144
145 Channel<T>* channel() PW_LOCK_RETURNED(this->base_channel()) {
146 return static_cast<Channel<T>*>(base_channel());
147 }
148
149 private:
150 using BaseChannelFuture::base_channel;
151 using BaseChannelFuture::MarkCompleted;
152 using BaseChannelFuture::Wake;
153};
154
155// Internal generic channel type. BaseChannel is not exposed to users. Its
156// public interface is for internal consumption.
157class PW_LOCKABLE("pw::async2::internal::BaseChannel") BaseChannel {
158 public:
159 static constexpr chrono::SystemClock::duration kWaitForever =
160 chrono::SystemClock::duration::max();
161
162 // Acquires the channel's lock.
163 void lock() PW_EXCLUSIVE_LOCK_FUNCTION() { lock_.lock(); }
164
165 // Releases the channel's lock.
166 void unlock() PW_UNLOCK_FUNCTION() { lock_.unlock(); }
167
168 [[nodiscard]] bool is_open() PW_LOCKS_EXCLUDED(*this) {
169 std::lock_guard lock(*this);
170 return is_open_locked();
171 }
172
173 bool is_open_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
174 return !closed_;
175 }
176
177 [[nodiscard]] bool active_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
178 return ref_count_ != 0;
179 }
180
181 // Removes a reference to this channel and destroys the channel if needed.
182 void RemoveRefAndDestroyIfUnreferenced() PW_UNLOCK_FUNCTION();
183
184 void Close() PW_LOCKS_EXCLUDED(*this) {
185 std::lock_guard lock(*this);
186 CloseLocked();
187 }
188
189 // Adds a SendFuture or ReserveSendFuture to the list of pending futures.
190 void add_send_future(BaseChannelFuture& future)
192 containers::PushBackSlow(send_futures_, future);
193 }
194
195 // Adds a ReceiveFuture to the list of pending futures.
196 void add_receive_future(BaseChannelFuture& future)
198 containers::PushBackSlow(receive_futures_, future);
199 }
200
201 void DropReservationAndRemoveRef() PW_LOCKS_EXCLUDED(*this);
202
203 void add_receiver() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
204 add_object(receiver_count_);
205 }
206
207 void add_sender() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
208 add_object(sender_count_);
209 }
210
211 void add_handle() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
212 add_object(handle_count_);
213 }
214
215 void add_reservation() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
216 reservations_ += 1;
217 }
218
219 void remove_reservation() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
220 PW_DASSERT(reservations_ > 0);
221 reservations_ -= 1;
222 }
223
224 void remove_sender() PW_LOCKS_EXCLUDED(*this) {
225 remove_object(&sender_count_);
226 }
227
228 void remove_receiver() PW_LOCKS_EXCLUDED(*this) {
229 remove_object(&receiver_count_);
230 }
231
232 void remove_handle() PW_LOCKS_EXCLUDED(*this) {
233 remove_object(&handle_count_);
234 }
235
236 void add_ref() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
237 PW_ASSERT(CheckedIncrement(ref_count_, 1));
238 }
239
240 protected:
241 constexpr BaseChannel() = default;
242
243 ~BaseChannel();
244
245 void WakeOneReceiver() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
246 PopAndWakeOneIfAvailable(receive_futures_);
247 }
248
249 void WakeOneSender() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
250 PopAndWakeOneIfAvailable(send_futures_);
251 }
252
253 uint16_t reservations() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
254 return reservations_;
255 }
256
257 private:
258 static void PopAndWakeAll(IntrusiveForwardList<BaseChannelFuture>& futures);
259
260 static void PopAndWakeOneIfAvailable(
262
263 static void PopAndWakeOne(IntrusiveForwardList<BaseChannelFuture>& futures);
264
265 void add_object(uint8_t& counter) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
266 if (is_open_locked()) {
267 PW_ASSERT(CheckedIncrement(counter, 1));
268 }
269 add_ref();
270 }
271
272 // Takes a pointer since otherwise Clang's thread safety analysis complains
273 // about taking a reference without the lock held.
274 void remove_object(uint8_t* counter) PW_LOCKS_EXCLUDED(*this);
275
276 // Returns true if the channel should be closed following a reference
277 // decrement.
278 //
279 // Handles can create new senders and receivers, so as long as one exists, the
280 // channel should remain open. Without active handles, the channel closes when
281 // either end fully hangs up.
282 bool should_close() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
283 return handle_count_ == 0 && (sender_count_ == 0 || receiver_count_ == 0);
284 }
285
286 void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(*this);
287
288 // Destroys the channel if it is dynamically allocated.
289 virtual void Destroy() {}
290
293
294 uint16_t reservations_ PW_GUARDED_BY(*this) = 0;
295 bool closed_ PW_GUARDED_BY(*this) = false;
296 mutable sync::InterruptSpinLock lock_;
297
298 // Channels are reference counted in two ways:
299 //
300 // - Senders and receivers are tracked independently. Once either reaches
301 // zero, the channel is closed, but not destroyed. No new values can be
302 // sent, but any buffered values can still be read.
303 //
304 // - Overall object reference count, including senders, receivers, futures,
305 // and channel handles. Once this reaches zero, the channel is destroyed.
306 //
307 uint8_t sender_count_ PW_GUARDED_BY(*this) = 0;
308 uint8_t receiver_count_ PW_GUARDED_BY(*this) = 0;
309 uint8_t handle_count_ PW_GUARDED_BY(*this) = 0;
310 uint16_t ref_count_ PW_GUARDED_BY(*this) = 0;
311};
312
313// Like BaseChannel, Channel is an internal class that is not exposed to users.
314// Its public interface is for internal consumption.
315template <typename T>
316class Channel : public BaseChannel {
317 public:
318 Sender<T> CreateSender() PW_LOCKS_EXCLUDED(*this) {
319 {
320 std::lock_guard guard(*this);
321 if (is_open_locked()) {
322 return Sender<T>(*this);
323 }
324 }
325 return Sender<T>();
326 }
327
328 Receiver<T> CreateReceiver() PW_LOCKS_EXCLUDED(*this) {
329 {
330 std::lock_guard guard(*this);
331 if (is_open_locked()) {
332 return Receiver<T>(*this);
333 }
334 }
335 return Receiver<T>();
336 }
337
338 void PushAndWake(T&& value) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
339 deque_.push_back(std::move(value));
340 WakeOneReceiver();
341 }
342
343 void PushAndWake(const T& value) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
344 deque_.push_back(value);
345 WakeOneReceiver();
346 }
347
348 template <typename... Args>
349 void EmplaceAndWake(Args&&... args) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
350 deque_.emplace_back(std::forward<Args>(args)...);
351 WakeOneReceiver();
352 }
353
354 T PopAndWake() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
355 T value = std::move(deque_.front());
356 deque_.pop_front();
357
358 WakeOneSender();
359 return value;
360 }
361
362 bool full() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
363 return remaining_capacity_locked() == 0;
364 }
365
366 uint16_t remaining_capacity() PW_LOCKS_EXCLUDED(*this) {
367 std::lock_guard guard(*this);
368 return remaining_capacity_locked();
369 }
370
371 uint16_t remaining_capacity_locked() const
373 return deque_.capacity() - deque_.size() - reservations();
374 }
375
376 uint16_t capacity() const PW_NO_LOCK_SAFETY_ANALYSIS {
377 // SAFETY: The capacity of `deque_` cannot change.
378 return deque_.capacity();
379 }
380
381 [[nodiscard]] bool empty() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
382 return deque_.empty();
383 }
384
385 template <typename U>
386 Status TrySend(U&& value) PW_LOCKS_EXCLUDED(*this) {
387 std::lock_guard guard(*this);
388 if (!is_open_locked()) {
390 }
391 if (full()) {
392 return Status::Unavailable();
393 }
394 PushAndWake(std::forward<U>(value));
395 return OkStatus();
396 }
397
398 Result<T> TryReceive() PW_LOCKS_EXCLUDED(*this) {
399 std::lock_guard guard(*this);
400 if (deque_.empty()) {
401 return is_open_locked() ? Status::Unavailable()
403 }
404 return Result(PopAndWake());
405 }
406
407 Result<SendReservation<T>> TryReserveSend() PW_LOCKS_EXCLUDED(*this) {
408 std::lock_guard guard(*this);
409 if (!is_open_locked()) {
411 }
412 if (full()) {
413 return Status::Unavailable();
414 }
415 add_reservation();
416 return SendReservation<T>(*this);
417 }
418
419 template <typename... Args>
420 void CommitReservationAndRemoveRef(Args&&... args) PW_LOCKS_EXCLUDED(*this) {
421 lock();
422 remove_reservation();
423 if (is_open_locked()) {
424 EmplaceAndWake(std::forward<Args>(args)...);
425 }
426 RemoveRefAndDestroyIfUnreferenced();
427 }
428
429 protected:
430 constexpr explicit Channel(FixedDeque<T>&& deque)
431 : deque_(std::move(deque)) {}
432
433 template <size_t kAlignment, size_t kCapacity>
435 : deque_(storage) {}
436
437 ~Channel() = default;
438
439 Deallocator* deallocator() const PW_NO_LOCK_SAFETY_ANALYSIS {
440 // SAFETY: deque_.deallocator() cannot change.
441 return deque_.deallocator();
442 }
443
444 private:
445 FixedDeque<T> deque_ PW_GUARDED_BY(*this);
446};
447
448template <typename T>
449class DynamicChannel final : public Channel<T> {
450 public:
451 static Channel<T>* Allocate(Allocator& alloc, uint16_t capacity) {
452 FixedDeque<T> deque = FixedDeque<T>::TryAllocate(alloc, capacity);
453 if (deque.capacity() == 0) {
454 return nullptr;
455 }
456 return alloc.New<DynamicChannel<T>>(std::move(deque));
457 }
458
459 explicit DynamicChannel(FixedDeque<T>&& deque)
460 : Channel<T>(std::move(deque)) {}
461
462 private:
463 ~DynamicChannel() = default;
464
465 void Destroy() final PW_LOCKS_EXCLUDED(*this) {
466 Deallocator* const deallocator = this->deallocator();
467 this->~DynamicChannel();
468 deallocator->Deallocate(this);
469 }
470};
471
478 public:
479 constexpr BaseChannelHandle() : channel_(nullptr) {}
480
482
484
485 [[nodiscard]] bool is_open() const PW_LOCKS_EXCLUDED(channel_) {
486 return channel_ != nullptr && channel_->is_open();
487 }
488
491 void Close() PW_LOCKS_EXCLUDED(channel_);
492
499 void Release() PW_LOCKS_EXCLUDED(channel_);
500
501 protected:
502 explicit BaseChannelHandle(BaseChannel& channel)
504 : channel_(&channel) {
505 channel_->add_handle();
506 }
507
508 BaseChannelHandle& operator=(const BaseChannelHandle& other)
509 PW_LOCKS_EXCLUDED(channel_);
510
511 BaseChannelHandle(BaseChannelHandle&& other) noexcept
512 : channel_(std::exchange(other.channel_, nullptr)) {}
513
514 BaseChannelHandle& operator=(BaseChannelHandle&& other) noexcept
515 PW_LOCKS_EXCLUDED(channel_);
516
517 constexpr BaseChannel* channel() const PW_LOCK_RETURNED(channel_) {
518 return channel_;
519 }
520
521 private:
522 BaseChannel* channel_;
523};
524
525} // namespace internal
526
528
530template <typename T>
532 public:
533 constexpr ChannelHandle() = default;
534
535 ChannelHandle(const ChannelHandle&) = default;
536 ChannelHandle& operator=(const ChannelHandle&) = default;
537
538 ChannelHandle(ChannelHandle&&) = default;
539 ChannelHandle& operator=(ChannelHandle&&) = default;
540
541 protected:
542 explicit ChannelHandle(internal::Channel<T>& channel)
544 : internal::BaseChannelHandle(channel) {}
545
549 PW_ASSERT(channel() != nullptr);
550 return static_cast<internal::Channel<T>&>(*channel()).CreateSender();
551 }
552
556 PW_ASSERT(channel() != nullptr);
557 return static_cast<internal::Channel<T>&>(*channel()).CreateReceiver();
558 }
559};
560
562template <typename T>
563class MpmcChannelHandle final : public ChannelHandle<T> {
564 public:
565 constexpr MpmcChannelHandle() = default;
566
569
570 private:
571 explicit MpmcChannelHandle(internal::Channel<T>& channel)
572 : ChannelHandle<T>(channel) {}
573
574 template <typename U>
575 friend std::optional<MpmcChannelHandle<U>> CreateMpmcChannel(Allocator&,
576 uint16_t);
577
578 template <typename U, uint16_t kCapacity>
581};
582
584template <typename T>
585class MpscChannelHandle final : public ChannelHandle<T> {
586 public:
587 constexpr MpscChannelHandle() = default;
588
590
591 private:
592 explicit MpscChannelHandle(internal::Channel<T>& channel)
593 : ChannelHandle<T>(channel) {}
594
595 template <typename U>
596 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
597 CreateMpscChannel(Allocator&, uint16_t);
598
599 template <typename U, uint16_t kCapacity>
600 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
602};
603
605template <typename T>
606class SpmcChannelHandle final : public ChannelHandle<T> {
607 public:
608 constexpr SpmcChannelHandle() = default;
609
611
612 private:
613 explicit SpmcChannelHandle(internal::Channel<T>& channel)
614 : ChannelHandle<T>(channel) {}
615
616 template <typename U>
617 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
618 CreateSpmcChannel(Allocator&, uint16_t);
619
620 template <typename U, uint16_t kCapacity>
621 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
623};
624
626template <typename T>
627class SpscChannelHandle final : public ChannelHandle<T> {
628 public:
629 constexpr SpscChannelHandle() = default;
630
631 private:
632 explicit SpscChannelHandle(internal::Channel<T>& channel)
633 : ChannelHandle<T>(channel) {}
634
635 template <typename U>
636 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
637 CreateSpscChannel(Allocator&, uint16_t);
638
639 template <typename U, uint16_t kCapacity>
640 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
642};
643
647template <typename T>
648class MpChannelHandle final : public ChannelHandle<T> {
649 public:
650 constexpr MpChannelHandle() = default;
651
653 : ChannelHandle<T>(other) {}
654
655 MpChannelHandle& operator=(const MpmcChannelHandle<T>& other) {
657 return *this;
658 }
659
661 : ChannelHandle<T>(std::move(other)) {}
662
663 MpChannelHandle& operator=(MpmcChannelHandle<T>&& other) {
664 ChannelHandle<T>::operator=(std::move(other));
665 return *this;
666 }
667
669 : ChannelHandle<T>(other) {}
670
671 MpChannelHandle& operator=(const MpscChannelHandle<T>& other) {
673 return *this;
674 }
675
677 : ChannelHandle<T>(std::move(other)) {}
678
679 MpChannelHandle& operator=(MpscChannelHandle<T>&& other) {
680 ChannelHandle<T>::operator=(std::move(other));
681 return *this;
682 }
683
685};
686
690template <typename T>
691class McChannelHandle final : public ChannelHandle<T> {
692 public:
693 constexpr McChannelHandle() = default;
694
696 : ChannelHandle<T>(other) {}
697
698 McChannelHandle& operator=(const MpmcChannelHandle<T>& other) {
700 return *this;
701 }
702
704 : ChannelHandle<T>(std::move(other)) {}
705
706 McChannelHandle& operator=(MpmcChannelHandle<T>&& other) {
707 ChannelHandle<T>::operator=(std::move(other));
708 return *this;
709 }
710
712 : ChannelHandle<T>(other) {}
713
714 McChannelHandle& operator=(const SpmcChannelHandle<T>& other) {
716 return *this;
717 }
718
720 : ChannelHandle<T>(std::move(other)) {}
721
722 McChannelHandle& operator=(SpmcChannelHandle<T>&& other) {
723 ChannelHandle<T>::operator=(std::move(other));
724 return *this;
725 }
726
728};
729
734template <typename T, uint16_t kCapacity>
735class ChannelStorage final : private containers::StorageBaseFor<T, kCapacity>,
736 private internal::Channel<T> {
737 public:
738 template <typename U, uint16_t kCap>
739 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
740 CreateSpscChannel(ChannelStorage<U, kCap>& storage);
741
742 template <typename U, uint16_t kCap>
743 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
744 ChannelStorage<U, kCap>& storage);
745
746 template <typename U, uint16_t kCap>
747 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
748 ChannelStorage<U, kCap>& storage);
749
750 template <typename U, uint16_t kCap>
751 friend MpmcChannelHandle<U> CreateMpmcChannel(
752 ChannelStorage<U, kCap>& storage);
753
754 ChannelStorage() : internal::Channel<T>(this->storage()) {}
755
756 ~ChannelStorage() = default;
757
760 [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(*this) {
761 std::lock_guard lock(*this);
762 return this->active_locked();
763 }
764
765 constexpr uint16_t capacity() const { return kCapacity; }
766};
767
768template <typename T>
769class [[nodiscard]] ReceiveFuture final
770 : public internal::ChannelFuture<ReceiveFuture<T>, T, std::optional<T>> {
771 private:
773
774 public:
775 constexpr ReceiveFuture() = default;
776
778 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel())
779 : Base(std::move(other)) {}
780
781 ReceiveFuture& operator=(ReceiveFuture&& other)
782 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
783 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
784 return static_cast<ReceiveFuture&>(this->MoveAssignFrom(other));
785 }
786
787 ~ReceiveFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
788 this->RemoveFromChannel();
789 }
790
791 private:
792 friend Base;
794 friend Receiver<T>;
795 template <typename, typename>
796 friend class CallbackTask;
797
798 explicit ReceiveFuture(internal::Channel<T>* channel)
799 PW_LOCKS_EXCLUDED(*channel)
800 : Base(channel, this->kAllowClosed) {}
801
802 PollOptional<T> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
803 if (this->channel() == nullptr) {
804 return Ready<std::optional<T>>(std::nullopt);
805 }
806
807 this->channel()->lock();
808 if (this->channel()->empty()) {
809 return this->StoreWakerForReceiveIfOpen(cx)
810 ? Pending()
811 : Ready<std::optional<T>>(std::nullopt);
812 }
813
814 auto result = Ready(this->channel()->PopAndWake());
815 this->Complete();
816 return result;
817 }
818};
819
821template <typename T>
822class Receiver {
823 public:
824 constexpr Receiver() : channel_(nullptr) {}
825
826 Receiver(const Receiver& other) = delete;
827 Receiver& operator=(const Receiver& other) = delete;
828
829 Receiver(Receiver&& other) noexcept
830 : channel_(std::exchange(other.channel_, nullptr)) {}
831
832 Receiver& operator=(Receiver&& other) noexcept {
833 if (this == &other) {
834 return *this;
835 }
836 if (channel_ != nullptr) {
837 channel_->remove_receiver();
838 }
839 channel_ = std::exchange(other.channel_, nullptr);
840 return *this;
841 }
842
843 ~Receiver() {
844 if (channel_ != nullptr) {
845 channel_->remove_receiver();
846 }
847 }
848
861 PW_LOCKS_EXCLUDED(*channel_) {
862 if (channel_ == nullptr) {
864 }
865
866 // Return immediately if a value is available or the channel is closed.
867 if (Result<T> result = channel_->TryReceive();
868 result.ok() || result.status().IsFailedPrecondition()) {
869 return result;
870 }
871
872 std::optional<T> result;
874
876 [&result, &notification](std::optional<T>&& val) {
877 result = std::move(val);
878 notification.release();
879 },
880 channel_);
881 dispatcher.Post(task);
882
883 if (timeout == internal::Channel<T>::kWaitForever) {
884 notification.acquire();
885 if (!result.has_value()) {
887 }
888 return Result<T>(std::move(*result));
889 }
890
891 if (!notification.try_acquire_for(timeout)) {
893 }
894
895 if (!result.has_value()) {
897 }
898 return Result<T>(std::move(*result));
899 }
900
909 return ReceiveFuture<T>(channel_);
910 }
911
919 if (channel_ == nullptr) {
921 }
922 return channel_->TryReceive();
923 }
924
929 void Disconnect() {
930 if (channel_ != nullptr) {
931 channel_->remove_receiver();
932 channel_ = nullptr;
933 }
934 }
935
937 [[nodiscard]] bool is_open() const {
938 return channel_ != nullptr && channel_->is_open();
939 }
940
941 private:
942 template <typename U>
943 friend class internal::Channel;
944
945 template <typename U>
946 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
947 CreateMpscChannel(Allocator&, uint16_t);
948
949 template <typename U, uint16_t kCapacity>
950 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
952
953 template <typename U>
954 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
955 CreateSpscChannel(Allocator&, uint16_t);
956
957 template <typename U, uint16_t kCapacity>
958 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
960
961 explicit Receiver(internal::Channel<T>& channel)
963 : channel_(&channel) {
964 channel_->add_receiver();
965 }
966
967 internal::Channel<T>* channel_;
968};
969
970template <typename T>
971class [[nodiscard]] SendFuture final
972 : public internal::ChannelFuture<SendFuture<T>, T, bool> {
973 private:
975
976 public:
977 SendFuture(SendFuture&& other) PW_LOCKS_EXCLUDED(*other.channel())
978 : Base(static_cast<Base&&>(other)), value_(std::move(other.value_)) {}
979
980 SendFuture& operator=(SendFuture&& other)
981 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
982 value_ = std::move(other.value_);
983 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
984 return static_cast<SendFuture&>(this->MoveAssignFrom(other));
985 }
986
987 ~SendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
988 this->RemoveFromChannel();
989 }
990
991 private:
992 friend Base;
994 friend Sender<T>;
995
996 SendFuture(internal::Channel<T>* channel, const T& value)
997 PW_LOCKS_EXCLUDED(*channel)
998 : Base(channel), value_(value) {}
999
1000 SendFuture(internal::Channel<T>* channel, T&& value)
1001 PW_LOCKS_EXCLUDED(*channel)
1002 : Base(channel), value_(std::move(value)) {}
1003
1004 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
1005 if (this->channel() == nullptr) {
1006 return Ready(false);
1007 }
1008
1009 this->channel()->lock();
1010 if (!this->channel()->is_open_locked()) {
1011 this->Complete();
1012 return Ready(false);
1013 }
1014
1015 if (this->channel()->full()) {
1016 this->StoreWakerForSend(cx);
1017 return Pending();
1018 }
1019
1020 this->channel()->PushAndWake(std::move(value_));
1021 this->Complete();
1022 return Ready(true);
1023 }
1024
1025 T value_;
1026};
1027
1034template <typename T>
1036 public:
1037 SendReservation(const SendReservation& other) = delete;
1038 SendReservation& operator=(const SendReservation& other) = delete;
1039
1041 : channel_(std::exchange(other.channel_, nullptr)) {}
1042
1043 SendReservation& operator=(SendReservation&& other) {
1044 if (this == &other) {
1045 return *this;
1046 }
1047 Cancel();
1048 channel_ = std::exchange(other.channel_, nullptr);
1049 return *this;
1050 }
1051
1052 ~SendReservation() { Cancel(); }
1053
1055 template <typename... Args>
1056 void Commit(Args&&... args) {
1057 PW_ASSERT(channel_ != nullptr);
1058 channel_->CommitReservationAndRemoveRef(std::forward<Args>(args)...);
1059 channel_ = nullptr;
1060 }
1061
1063 void Cancel() {
1064 if (channel_ != nullptr) {
1065 channel_->DropReservationAndRemoveRef();
1066 channel_ = nullptr;
1067 }
1068 }
1069
1070 private:
1071 friend internal::Channel<T>;
1072 friend class ReserveSendFuture<T>;
1073 friend class Sender<T>;
1074
1075 explicit SendReservation(internal::Channel<T>& channel)
1077 : channel_(&channel) {
1078 channel_->add_ref();
1079 }
1080
1081 internal::Channel<T>* channel_;
1082};
1083
1084template <typename T>
1085class [[nodiscard]] ReserveSendFuture final
1086 : public internal::ChannelFuture<ReserveSendFuture<T>,
1087 T,
1088 std::optional<SendReservation<T>>> {
1089 private:
1090 using Base = internal::
1091 ChannelFuture<ReserveSendFuture, T, std::optional<SendReservation<T>>>;
1092
1093 public:
1094 ReserveSendFuture(ReserveSendFuture&& other) : Base(std::move(other)) {}
1095
1096 ReserveSendFuture& operator=(ReserveSendFuture&& other) {
1097 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1098 return static_cast<ReserveSendFuture&>(this->MoveAssignFrom(other));
1099 }
1100
1101 ~ReserveSendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1102 this->RemoveFromChannel();
1103 }
1104
1105 private:
1106 friend Base;
1107 friend internal::Channel<T>;
1108 friend Sender<T>;
1109
1110 explicit ReserveSendFuture(internal::Channel<T>* channel)
1111 PW_LOCKS_EXCLUDED(*channel)
1112 : Base(channel) {}
1113
1115 PW_LOCKS_EXCLUDED(*this->channel()) {
1116 if (this->channel() == nullptr) {
1117 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1118 }
1119
1120 this->channel()->lock();
1121 if (!this->channel()->is_open_locked()) {
1122 this->Complete();
1123 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1124 }
1125
1126 if (this->channel()->remaining_capacity_locked() == 0) {
1127 this->StoreWakerForReserveSend(cx);
1128 return Pending();
1129 }
1130
1131 this->channel()->add_reservation();
1132 SendReservation<T> reservation(*this->channel());
1133 this->Complete();
1134 return reservation;
1135 }
1136};
1137
1139template <typename T>
1140class Sender {
1141 public:
1142 constexpr Sender() : channel_(nullptr) {}
1143
1144 Sender(const Sender& other) = delete;
1145 Sender& operator=(const Sender& other) = delete;
1146
1147 Sender(Sender&& other) noexcept
1148 : channel_(std::exchange(other.channel_, nullptr)) {}
1149
1150 Sender& operator=(Sender&& other) noexcept {
1151 if (this == &other) {
1152 return *this;
1153 }
1154 if (channel_ != nullptr) {
1155 channel_->remove_sender();
1156 }
1157 channel_ = std::exchange(other.channel_, nullptr);
1158 return *this;
1159 }
1160
1161 ~Sender() {
1162 if (channel_ != nullptr) {
1163 channel_->remove_sender();
1164 }
1165 }
1166
1175 template <typename U>
1176 SendFuture<T> Send(U&& value) {
1177 return SendFuture<T>(channel_, std::forward<U>(value));
1178 }
1179
1186
1197 if (channel_ == nullptr) {
1199 }
1200 return channel_->TryReserveSend();
1201 }
1202
1212 Status TrySend(const T& value) {
1213 if (channel_ == nullptr) {
1215 }
1216 return channel_->TrySend(value);
1217 }
1218
1220 Status TrySend(T&& value) {
1221 if (channel_ == nullptr) {
1223 }
1224 return channel_->TrySend(std::move(value));
1225 }
1226
1238 const T& value,
1241 return BlockingSendMoveOrCopy(dispatcher, value, timeout);
1242 }
1243
1246 T&& value,
1249 return BlockingSendMoveOrCopy(dispatcher, std::move(value), timeout);
1250 }
1251
1256 void Disconnect() {
1257 if (channel_ != nullptr) {
1258 channel_->remove_sender();
1259 channel_ = nullptr;
1260 }
1261 }
1262
1264 uint16_t remaining_capacity() const {
1265 return channel_ != nullptr ? channel_->remaining_capacity() : 0;
1266 }
1267
1269 uint16_t capacity() const {
1270 return channel_ != nullptr ? channel_->capacity() : 0;
1271 }
1272
1274 [[nodiscard]] bool is_open() const {
1275 return channel_ != nullptr && channel_->is_open();
1276 }
1277
1278 private:
1279 template <typename U>
1280 friend class internal::Channel;
1281
1282 template <typename U>
1283 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
1284 CreateSpmcChannel(Allocator&, uint16_t);
1285
1286 template <typename U, uint16_t kCapacity>
1287 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
1289
1290 template <typename U>
1291 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
1292 CreateSpscChannel(Allocator&, uint16_t);
1293
1294 template <typename U, uint16_t kCapacity>
1295 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
1297
1298 explicit Sender(internal::Channel<T>& channel)
1300 : channel_(&channel) {
1301 channel_->add_sender();
1302 }
1303
1304 template <typename U>
1305 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1306 U&& value,
1308 PW_LOCKS_EXCLUDED(*channel_) {
1309 if (channel_ == nullptr) {
1311 }
1312
1313 if (Status status = channel_->TrySend(std::forward<U>(value));
1314 status.ok() || status.IsFailedPrecondition()) {
1315 return status;
1316 }
1317
1318 return BlockingSendFuture(
1319 dispatcher, // NOLINTNEXTLINE(bugprone-use-after-move)
1320 SendFuture<T>(channel_, std::forward<U>(value)),
1321 timeout);
1322 }
1323
1324 Status BlockingSendFuture(Dispatcher& dispatcher,
1325 SendFuture<T>&& future,
1327 PW_LOCKS_EXCLUDED(*channel_) {
1328 Status status;
1329 sync::TimedThreadNotification notification;
1330
1331 CallbackTask task(
1332 [&status, &notification](bool result) {
1333 status = result ? OkStatus() : Status::FailedPrecondition();
1334 notification.release();
1335 },
1336 std::move(future));
1337 dispatcher.Post(task);
1338
1339 if (timeout == internal::Channel<T>::kWaitForever) {
1340 notification.acquire();
1341 return status;
1342 }
1343
1344 if (!notification.try_acquire_for(timeout)) {
1345 task.Deregister();
1346 return Status::DeadlineExceeded();
1347 }
1348 return status;
1349 }
1350
1351 internal::Channel<T>* channel_;
1352};
1353
1367template <typename T>
1368std::optional<MpmcChannelHandle<T>> CreateMpmcChannel(Allocator& alloc,
1369 uint16_t capacity) {
1370 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1371 if (channel == nullptr) {
1372 return std::nullopt;
1373 }
1374 std::lock_guard lock(*channel);
1375 return MpmcChannelHandle<T>(*channel);
1376}
1377
1389template <typename T, uint16_t kCapacity>
1391 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1392 PW_DASSERT(!storage.active_locked());
1393 return MpmcChannelHandle<T>(storage);
1394}
1395
1409template <typename T>
1410std::optional<std::tuple<MpscChannelHandle<T>, Receiver<T>>> CreateMpscChannel(
1411 Allocator& alloc, uint16_t capacity) {
1412 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1413 if (channel == nullptr) {
1414 return std::nullopt;
1415 }
1416 std::lock_guard lock(*channel);
1417 return std::make_tuple(MpscChannelHandle<T>(*channel), Receiver<T>(*channel));
1418}
1419
1431template <typename T, uint16_t kCapacity>
1432std::tuple<MpscChannelHandle<T>, Receiver<T>> CreateMpscChannel(
1434 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1435 PW_DASSERT(!storage.active_locked());
1436 return std::make_tuple(MpscChannelHandle<T>(storage), Receiver<T>(storage));
1437}
1438
1452template <typename T>
1453std::optional<std::tuple<SpmcChannelHandle<T>, Sender<T>>> CreateSpmcChannel(
1454 Allocator& alloc, uint16_t capacity) {
1455 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1456 if (channel == nullptr) {
1457 return std::nullopt;
1458 }
1459 std::lock_guard lock(*channel);
1460 return std::make_tuple(SpmcChannelHandle<T>(*channel), Sender<T>(*channel));
1461}
1462
1474template <typename T, uint16_t kCapacity>
1475std::tuple<SpmcChannelHandle<T>, Sender<T>> CreateSpmcChannel(
1477 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1478 PW_DASSERT(!storage.active_locked());
1479 return std::make_tuple(SpmcChannelHandle<T>(storage), Sender<T>(storage));
1480}
1481
1495template <typename T>
1496std::optional<std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>>>
1497CreateSpscChannel(Allocator& alloc, uint16_t capacity) {
1498 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1499 if (channel == nullptr) {
1500 return std::nullopt;
1501 }
1502 std::lock_guard lock(*channel);
1503 return std::make_tuple(SpscChannelHandle<T>(*channel),
1504 Sender<T>(*channel),
1505 Receiver<T>(*channel));
1506}
1507
1519template <typename T, uint16_t kCapacity>
1520std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>> CreateSpscChannel(
1522 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1523 PW_DASSERT(!storage.active_locked());
1524 return std::make_tuple(
1525 SpscChannelHandle<T>(storage), Sender<T>(storage), Receiver<T>(storage));
1526}
1527
1529
1530namespace internal {
1531
1532inline void BaseChannelFuture::Complete() PW_UNLOCK_FUNCTION(*channel_) {
1533 channel_->RemoveRefAndDestroyIfUnreferenced();
1534 channel_ = nullptr;
1535}
1536
1537inline void BaseChannel::PopAndWakeOne(
1538 IntrusiveForwardList<BaseChannelFuture>& futures) {
1539 BaseChannelFuture& future = futures.front();
1540 futures.pop_front();
1541 future.Wake();
1542}
1543
1544} // namespace internal
1545} // namespace pw::async2
Definition: allocator.h:43
std::enable_if_t<!std::is_array_v< T >, T * > New(Args &&... args)
Definition: allocator.h:64
Abstract interface for releasing memory.
Definition: deallocator.h:29
Definition: deque.h:207
Definition: intrusive_forward_list.h:99
Definition: result.h:143
Definition: status.h:120
static constexpr Status DeadlineExceeded()
Definition: status.h:177
static constexpr Status Unavailable()
Definition: status.h:304
static constexpr Status FailedPrecondition()
Definition: status.h:243
Definition: callback_task.h:44
Channel handle for a particular type T.
Definition: channel.h:531
Sender< T > CreateSender()
Definition: channel.h:548
Receiver< T > CreateReceiver()
Definition: channel.h:555
Definition: channel.h:736
bool active() const
Definition: channel.h:760
Definition: context.h:54
Definition: dispatcher.h:53
Definition: channel.h:691
Definition: channel.h:648
A handle to a multi-producer, multi-consumer channel.
Definition: channel.h:563
friend std::optional< MpmcChannelHandle< U > > CreateMpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1368
A handle to a multi-producer, single-consumer channel.
Definition: channel.h:585
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1410
Definition: poll.h:60
Definition: channel.h:770
A receiver which reads values from an asynchronous channel.
Definition: channel.h:822
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1410
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:937
Result< T > TryReceive()
Definition: channel.h:918
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1497
Result< T > BlockingReceive(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:858
void Disconnect()
Definition: channel.h:929
ReceiveFuture< T > Receive()
Definition: channel.h:908
Definition: channel.h:1088
Definition: channel.h:972
Definition: channel.h:1035
void Cancel()
Releases the reservation, making the space available for other senders.
Definition: channel.h:1063
void Commit(Args &&... args)
Commits a value to a reserved slot.
Definition: channel.h:1056
A sender which writes values to an asynchronous channel.
Definition: channel.h:1140
SendFuture< T > Send(U &&value)
Definition: channel.h:1176
Result< SendReservation< T > > TryReserveSend()
Definition: channel.h:1196
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1453
Status TrySend(const T &value)
Definition: channel.h:1212
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1497
ReserveSendFuture< T > ReserveSend()
Definition: channel.h:1185
uint16_t capacity() const
Returns the maximum capacity of the channel.
Definition: channel.h:1269
Status TrySend(T &&value)
Definition: channel.h:1220
uint16_t remaining_capacity() const
Returns the remaining capacity of the channel.
Definition: channel.h:1264
void Disconnect()
Definition: channel.h:1256
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1274
Status BlockingSend(Dispatcher &dispatcher, const T &value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1237
Status BlockingSend(Dispatcher &dispatcher, T &&value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1245
A handle to a single-producer, multi-consumer channel.
Definition: channel.h:606
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1453
A handle to a single-producer, single-consumer channel.
Definition: channel.h:627
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1497
bool is_complete() const
True if the future has returned Ready().
Definition: channel.h:68
Definition: channel.h:157
Definition: channel.h:124
Definition: channel.h:316
Definition: channel.h:449
Definition: storage.h:86
Definition: storage.h:39
constexpr size_type capacity() const noexcept
Returns the maximum number of elements in the deque.
Definition: generic_deque.h:74
Definition: interrupt_spin_lock.h:50
Definition: timed_thread_notification.h:40
bool try_acquire_for(chrono::SystemClock::duration timeout)
void Deallocate(void *ptr)
Definition: deallocator.h:60
std::optional< std::tuple< SpmcChannelHandle< T >, Sender< T > > > CreateSpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1453
std::optional< std::tuple< MpscChannelHandle< T >, Receiver< T > > > CreateMpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1410
std::optional< MpmcChannelHandle< T > > CreateMpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1368
std::optional< std::tuple< SpscChannelHandle< T >, Sender< T >, Receiver< T > > > CreateSpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1497
constexpr bool IsReady() const noexcept
Returns whether or not this value is Ready.
Definition: poll.h:133
constexpr PendingType Pending()
Returns a value indicating that an operation was not yet able to complete.
Definition: poll.h:271
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:255
std::chrono::duration< rep, period > duration
Alias for durations representable with this clock.
Definition: system_clock.h:90
constexpr bool CheckedIncrement(T &base, Inc inc)
Definition: checked_arithmetic.h:118
constexpr Status OkStatus()
Definition: status.h:450
#define PW_LOCKABLE(name)
Definition: lock_annotations.h:208
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_NO_LOCK_SAFETY_ANALYSIS
Definition: lock_annotations.h:292
#define PW_EXCLUSIVE_LOCK_FUNCTION(...)
Definition: lock_annotations.h:230
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146
#define PW_LOCK_RETURNED(x)
Definition: lock_annotations.h:197
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:247
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:176