C/C++ API Reference
Loading...
Searching...
No Matches
channel.h
1// Copyright 2025 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include <mutex>
17
18#include "pw_allocator/allocator.h"
19#include "pw_async2/callback_task.h"
20#include "pw_async2/dispatcher.h"
21#include "pw_async2/future.h"
22#include "pw_containers/deque.h"
23#include "pw_numeric/checked_arithmetic.h"
24#include "pw_result/result.h"
25#include "pw_sync/interrupt_spin_lock.h"
26#include "pw_sync/lock_annotations.h"
27#include "pw_sync/timed_thread_notification.h"
28
29namespace pw::async2 {
30
32
33template <typename T>
34class Receiver;
35
36template <typename T>
37class ReceiveFuture;
38
39template <typename T>
40class Sender;
41
42template <typename T>
43class SendFuture;
44
45template <typename T>
46class ReserveSendFuture;
47
48template <typename T>
49class SendReservation;
50
51template <typename T, uint16_t kCapacity>
52class ChannelStorage;
53
54namespace internal {
55
56template <typename T>
57class Channel;
58
59class BaseChannelFuture;
60
61// Internal generic channel type. BaseChannel is not exposed to users. Its
62// public interface is for internal consumption.
63class PW_LOCKABLE("pw::async2::internal::BaseChannel") BaseChannel {
64 public:
65 static constexpr chrono::SystemClock::duration kWaitForever =
66 chrono::SystemClock::duration::max();
67
68 // Acquires the channel's lock.
69 void lock() PW_EXCLUSIVE_LOCK_FUNCTION() { lock_.lock(); }
70
71 // Releases the channel's lock.
72 void unlock() PW_UNLOCK_FUNCTION() { lock_.unlock(); }
73
74 [[nodiscard]] bool is_open() PW_LOCKS_EXCLUDED(*this) {
75 std::lock_guard lock(*this);
76 return is_open_locked();
77 }
78
79 bool is_open_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
80 return !closed_;
81 }
82
83 [[nodiscard]] bool active_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
84 return ref_count_ != 0;
85 }
86
87 // Removes a reference to this channel and destroys the channel if needed.
88 void RemoveRefAndDestroyIfUnreferenced() PW_UNLOCK_FUNCTION();
89
90 void Close() PW_LOCKS_EXCLUDED(*this) {
91 std::lock_guard lock(*this);
92 CloseLocked();
93 }
94
95 // Adds a SendFuture or ReserveSendFuture to the list of pending futures.
96 void add_send_future(BaseChannelFuture& future)
98 containers::PushBackSlow(send_futures_, future);
99 }
100
101 // Adds a ReceiveFuture to the list of pending futures.
102 void add_receive_future(BaseChannelFuture& future)
104 containers::PushBackSlow(receive_futures_, future);
105 }
106
107 void DropReservationAndRemoveRef() PW_LOCKS_EXCLUDED(*this);
108
109 void add_receiver() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
110 add_object(receiver_count_);
111 }
112
113 void add_sender() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
114 add_object(sender_count_);
115 }
116
117 void add_handle() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
118 add_object(handle_count_);
119 }
120
121 void add_reservation() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
122 reservations_ += 1;
123 }
124
125 void remove_reservation() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
126 PW_DASSERT(reservations_ > 0);
127 reservations_ -= 1;
128 }
129
130 void remove_sender() PW_LOCKS_EXCLUDED(*this) {
131 remove_object(&sender_count_);
132 }
133
134 void remove_receiver() PW_LOCKS_EXCLUDED(*this) {
135 remove_object(&receiver_count_);
136 }
137
138 void remove_handle() PW_LOCKS_EXCLUDED(*this) {
139 remove_object(&handle_count_);
140 }
141
142 void add_ref() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
143 PW_ASSERT(CheckedIncrement(ref_count_, 1));
144 }
145
146 protected:
147 constexpr BaseChannel() = default;
148
149 ~BaseChannel();
150
151 void WakeOneReceiver() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
152 PopAndWakeOneIfAvailable(receive_futures_);
153 }
154
155 void WakeOneSender() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
156 PopAndWakeOneIfAvailable(send_futures_);
157 }
158
159 uint16_t reservations() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
160 return reservations_;
161 }
162
163 private:
164 static void PopAndWakeAll(IntrusiveForwardList<BaseChannelFuture>& futures);
165
166 static void PopAndWakeOneIfAvailable(
168
169 static void PopAndWakeOne(IntrusiveForwardList<BaseChannelFuture>& futures);
170
171 void add_object(uint8_t& counter) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
172 if (is_open_locked()) {
173 PW_ASSERT(CheckedIncrement(counter, 1));
174 }
175 add_ref();
176 }
177
178 // Takes a pointer since otherwise Clang's thread safety analysis complains
179 // about taking a reference without the lock held.
180 void remove_object(uint8_t* counter) PW_LOCKS_EXCLUDED(*this);
181
182 // Returns true if the channel should be closed following a reference
183 // decrement.
184 //
185 // Handles can create new senders and receivers, so as long as one exists, the
186 // channel should remain open. Without active handles, the channel closes when
187 // either end fully hangs up.
188 bool should_close() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
189 return handle_count_ == 0 && (sender_count_ == 0 || receiver_count_ == 0);
190 }
191
192 void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(*this);
193
194 // Destroys the channel if it is dynamically allocated.
195 virtual void Destroy() {}
196
199
200 uint16_t reservations_ PW_GUARDED_BY(*this) = 0;
201 bool closed_ PW_GUARDED_BY(*this) = false;
202 mutable sync::InterruptSpinLock lock_;
203
204 // Channels are reference counted in two ways:
205 //
206 // - Senders and receivers are tracked independently. Once either reaches
207 // zero, the channel is closed, but not destroyed. No new values can be
208 // sent, but any buffered values can still be read.
209 //
210 // - Overall object reference count, including senders, receivers, futures,
211 // and channel handles. Once this reaches zero, the channel is destroyed.
212 //
213 uint8_t sender_count_ PW_GUARDED_BY(*this) = 0;
214 uint8_t receiver_count_ PW_GUARDED_BY(*this) = 0;
215 uint8_t handle_count_ PW_GUARDED_BY(*this) = 0;
216 uint16_t ref_count_ PW_GUARDED_BY(*this) = 0;
217};
218
219class BaseChannelFuture : public IntrusiveForwardList<BaseChannelFuture>::Item {
220 public:
221 BaseChannelFuture(const BaseChannelFuture&) = delete;
222 BaseChannelFuture& operator=(const BaseChannelFuture&) = delete;
223
224 // Derived classes call MoveAssignFrom to move rather than use the operator.
225 BaseChannelFuture& operator=(BaseChannelFuture&&) = delete;
226
228 [[nodiscard]] bool is_complete() const { return completed_; }
229
230 // Internal API for the channel to wake the future.
231 void Wake() { std::move(waker_).Wake(); }
232
233 protected:
234 // Creates a new future, storing nullptr if `channel` is nullptr or if the
235 // channel is closed.
236 explicit BaseChannelFuture(BaseChannel* channel) PW_LOCKS_EXCLUDED(*channel);
237
238 enum AllowClosed { kAllowClosed };
239
240 // Creates a new future, but does NOT check if the channel is open.
241 BaseChannelFuture(BaseChannel* channel, AllowClosed)
242 PW_LOCKS_EXCLUDED(*channel) {
243 StoreAndAddRefIfNonnull(channel);
244 }
245
246 BaseChannelFuture(BaseChannelFuture&& other)
247 PW_LOCKS_EXCLUDED(*channel_, *other.channel_)
248 : channel_(other.channel_) {
249 MoveFrom(other);
250 }
251
252 BaseChannelFuture& MoveAssignFrom(BaseChannelFuture& other)
253 PW_LOCKS_EXCLUDED(*channel_, *other.channel_);
254
255 // Unlists this future and removes a reference from the channel.
256 void RemoveFromChannel() PW_LOCKS_EXCLUDED(*channel_);
257
258 bool StoreWakerForReceiveIfOpen(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
259
260 void StoreWakerForSend(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
261
262 void StoreWakerForReserveSend(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
263
264 void MarkCompleted() { completed_ = true; }
265
266 void Complete() PW_UNLOCK_FUNCTION(*channel_) {
267 channel_->RemoveRefAndDestroyIfUnreferenced();
268 channel_ = nullptr;
269 }
270
271 BaseChannel* base_channel() PW_LOCK_RETURNED(channel_) { return channel_; }
272
273 private:
274 void StoreAndAddRefIfNonnull(BaseChannel* channel)
275 PW_LOCKS_EXCLUDED(*channel);
276
277 void MoveFrom(BaseChannelFuture& other) PW_LOCKS_EXCLUDED(*other.channel_);
278
279 BaseChannel* channel_;
280 Waker waker_;
281
282 bool completed_ = false;
283};
284
285// Adds Pend function and is_complete flag to BaseChannelFuture.
286template <typename Derived, typename T, typename FutureValue>
288 public:
289 using value_type = FutureValue;
290
291 Poll<value_type> Pend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
292 PW_ASSERT(!is_complete());
293 Poll<value_type> result = static_cast<Derived&>(*this).DoPend(cx);
294 if (result.IsReady()) {
295 MarkCompleted();
296 }
297 return result;
298 }
299
300 protected:
301 explicit ChannelFuture(Channel<T>* channel) : BaseChannelFuture(channel) {}
302
303 ChannelFuture(Channel<T>* channel, AllowClosed)
304 : BaseChannelFuture(channel, kAllowClosed) {}
305
306 ChannelFuture(ChannelFuture&& other) : BaseChannelFuture(std::move(other)) {}
307
308 Channel<T>* channel() PW_LOCK_RETURNED(this->base_channel()) {
309 return static_cast<Channel<T>*>(base_channel());
310 }
311
312 private:
313 using BaseChannelFuture::base_channel;
314 using BaseChannelFuture::MarkCompleted;
315 using BaseChannelFuture::Wake;
316};
317
318// Like BaseChannel, Channel is an internal class that is not exposed to users.
319// Its public interface is for internal consumption.
320template <typename T>
321class Channel : public BaseChannel {
322 public:
323 Sender<T> CreateSender() PW_LOCKS_EXCLUDED(*this) {
324 {
325 std::lock_guard guard(*this);
326 if (is_open_locked()) {
327 return Sender<T>(*this);
328 }
329 }
330 return Sender<T>();
331 }
332
333 Receiver<T> CreateReceiver() PW_LOCKS_EXCLUDED(*this) {
334 {
335 std::lock_guard guard(*this);
336 if (is_open_locked()) {
337 return Receiver<T>(*this);
338 }
339 }
340 return Receiver<T>();
341 }
342
343 void PushAndWake(T&& value) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
344 deque_.push_back(std::move(value));
345 WakeOneReceiver();
346 }
347
348 void PushAndWake(const T& value) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
349 deque_.push_back(value);
350 WakeOneReceiver();
351 }
352
353 template <typename... Args>
354 void EmplaceAndWake(Args&&... args) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
355 deque_.emplace_back(std::forward<Args>(args)...);
356 WakeOneReceiver();
357 }
358
359 T PopAndWake() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
360 T value = std::move(deque_.front());
361 deque_.pop_front();
362
363 WakeOneSender();
364 return value;
365 }
366
367 bool full() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
368 return remaining_capacity_locked() == 0;
369 }
370
371 uint16_t remaining_capacity() PW_LOCKS_EXCLUDED(*this) {
372 std::lock_guard guard(*this);
373 return remaining_capacity_locked();
374 }
375
376 uint16_t remaining_capacity_locked() const
378 return deque_.capacity() - deque_.size() - reservations();
379 }
380
381 uint16_t capacity() const PW_NO_LOCK_SAFETY_ANALYSIS {
382 // SAFETY: The capacity of `deque_` cannot change.
383 return deque_.capacity();
384 }
385
386 [[nodiscard]] bool empty() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
387 return deque_.empty();
388 }
389
390 template <typename U>
391 Status TrySend(U&& value) PW_LOCKS_EXCLUDED(*this) {
392 std::lock_guard guard(*this);
393 if (!is_open_locked()) {
395 }
396 if (full()) {
397 return Status::Unavailable();
398 }
399 PushAndWake(std::forward<U>(value));
400 return OkStatus();
401 }
402
403 Result<T> TryReceive() PW_LOCKS_EXCLUDED(*this) {
404 std::lock_guard guard(*this);
405 if (deque_.empty()) {
406 return is_open_locked() ? Status::Unavailable()
408 }
409 return Result(PopAndWake());
410 }
411
412 Result<SendReservation<T>> TryReserveSend() PW_LOCKS_EXCLUDED(*this) {
413 std::lock_guard guard(*this);
414 if (!is_open_locked()) {
416 }
417 if (full()) {
418 return Status::Unavailable();
419 }
420 add_reservation();
421 return SendReservation<T>(*this);
422 }
423
424 template <typename... Args>
425 void CommitReservationAndRemoveRef(Args&&... args) PW_LOCKS_EXCLUDED(*this) {
426 lock();
427 remove_reservation();
428 if (is_open_locked()) {
429 EmplaceAndWake(std::forward<Args>(args)...);
430 }
431 RemoveRefAndDestroyIfUnreferenced();
432 }
433
434 protected:
435 constexpr explicit Channel(FixedDeque<T>&& deque)
436 : deque_(std::move(deque)) {}
437
438 template <size_t kAlignment, size_t kCapacity>
440 : deque_(storage) {}
441
442 ~Channel() = default;
443
444 Deallocator* deallocator() const PW_NO_LOCK_SAFETY_ANALYSIS {
445 // SAFETY: deque_.deallocator() cannot change.
446 return deque_.deallocator();
447 }
448
449 private:
450 FixedDeque<T> deque_ PW_GUARDED_BY(*this);
451};
452
453template <typename T>
454class DynamicChannel final : public Channel<T> {
455 public:
456 static Channel<T>* Allocate(Allocator& alloc, uint16_t capacity) {
457 FixedDeque<T> deque = FixedDeque<T>::TryAllocate(alloc, capacity);
458 if (deque.capacity() == 0) {
459 return nullptr;
460 }
461 return alloc.New<DynamicChannel<T>>(std::move(deque));
462 }
463
464 explicit DynamicChannel(FixedDeque<T>&& deque)
465 : Channel<T>(std::move(deque)) {}
466
467 private:
468 ~DynamicChannel() = default;
469
470 void Destroy() final PW_LOCKS_EXCLUDED(*this) {
471 Deallocator* const deallocator = this->deallocator();
472 this->~DynamicChannel();
473 deallocator->Deallocate(this);
474 }
475};
476
483 public:
484 constexpr BaseChannelHandle() : channel_(nullptr) {}
485
487
488 BaseChannelHandle& operator=(const BaseChannelHandle& other)
489 PW_LOCKS_EXCLUDED(channel_);
490
491 BaseChannelHandle(BaseChannelHandle&& other) noexcept
492 : channel_(std::exchange(other.channel_, nullptr)) {}
493
494 BaseChannelHandle& operator=(BaseChannelHandle&& other) noexcept
495 PW_LOCKS_EXCLUDED(channel_);
496
498
499 [[nodiscard]] bool is_open() const PW_LOCKS_EXCLUDED(channel_) {
500 return channel_ != nullptr && channel_->is_open();
501 }
502
505 void Close() PW_LOCKS_EXCLUDED(channel_);
506
513 void Release() PW_LOCKS_EXCLUDED(channel_);
514
515 constexpr BaseChannel* base_channel() const PW_LOCK_RETURNED(channel_) {
516 return channel_;
517 }
518
519 protected:
520 explicit BaseChannelHandle(BaseChannel& channel)
522 : channel_(&channel) {
523 channel_->add_handle();
524 }
525
526 private:
527 BaseChannel* channel_;
528};
529
531template <typename T>
533 public:
534 constexpr ChannelHandle() = default;
535
539 PW_ASSERT(base_channel() != nullptr);
540 return static_cast<Channel<T>*>(base_channel())->CreateSender();
541 }
542
546 PW_ASSERT(base_channel() != nullptr);
547 return static_cast<Channel<T>*>(base_channel())->CreateReceiver();
548 }
549
550 protected:
551 explicit ChannelHandle(internal::Channel<T>& channel)
553 : BaseChannelHandle(channel) {}
554};
555
556} // namespace internal
557
559template <typename T>
561 public:
562 constexpr MpmcChannelHandle() = default;
563
564 using internal::ChannelHandle<T>::is_open;
569
570 private:
571 explicit MpmcChannelHandle(internal::Channel<T>& channel)
572 : internal::ChannelHandle<T>(channel) {}
573
574 template <typename U>
575 friend std::optional<MpmcChannelHandle<U>> CreateMpmcChannel(Allocator&,
576 uint16_t);
577
578 template <typename U, uint16_t kCapacity>
581};
582
584template <typename T>
586 public:
587 constexpr MpscChannelHandle() = default;
588
589 using internal::ChannelHandle<T>::is_open;
593
594 private:
595 explicit MpscChannelHandle(internal::Channel<T>& channel)
596 : internal::ChannelHandle<T>(channel) {}
597
598 template <typename U>
599 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
600 CreateMpscChannel(Allocator&, uint16_t);
601
602 template <typename U, uint16_t kCapacity>
603 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
605};
606
608template <typename T>
610 public:
611 constexpr SpmcChannelHandle() = default;
612
613 using internal::ChannelHandle<T>::is_open;
617
618 private:
619 explicit SpmcChannelHandle(internal::Channel<T>& channel)
620 : internal::ChannelHandle<T>(channel) {}
621
622 template <typename U>
623 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
624 CreateSpmcChannel(Allocator&, uint16_t);
625
626 template <typename U, uint16_t kCapacity>
627 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
629};
630
632template <typename T>
634 public:
635 constexpr SpscChannelHandle() = default;
636
637 using internal::ChannelHandle<T>::is_open;
640
641 private:
642 explicit SpscChannelHandle(internal::Channel<T>& channel)
643 : internal::ChannelHandle<T>(channel) {}
644
645 template <typename U>
646 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
647 CreateSpscChannel(Allocator&, uint16_t);
648
649 template <typename U, uint16_t kCapacity>
650 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
652};
653
658template <typename T, uint16_t kCapacity>
659class ChannelStorage final : private containers::StorageBaseFor<T, kCapacity>,
660 private internal::Channel<T> {
661 public:
662 template <typename U, uint16_t kCap>
663 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
664 CreateSpscChannel(ChannelStorage<U, kCap>& storage);
665
666 template <typename U, uint16_t kCap>
667 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
668 ChannelStorage<U, kCap>& storage);
669
670 template <typename U, uint16_t kCap>
671 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
672 ChannelStorage<U, kCap>& storage);
673
674 template <typename U, uint16_t kCap>
675 friend MpmcChannelHandle<U> CreateMpmcChannel(
676 ChannelStorage<U, kCap>& storage);
677
678 ChannelStorage() : internal::Channel<T>(this->storage()) {}
679
680 ~ChannelStorage() = default;
681
684 [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(*this) {
685 std::lock_guard lock(*this);
686 return this->active_locked();
687 }
688
689 constexpr uint16_t capacity() const { return kCapacity; }
690};
691
692template <typename T>
693class [[nodiscard]] ReceiveFuture final
694 : public internal::ChannelFuture<ReceiveFuture<T>, T, std::optional<T>> {
695 private:
697
698 public:
699 constexpr ReceiveFuture() = default;
700
702 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel())
703 : Base(std::move(other)) {}
704
705 ReceiveFuture& operator=(ReceiveFuture&& other)
706 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
707 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
708 return static_cast<ReceiveFuture&>(this->MoveAssignFrom(other));
709 }
710
711 ~ReceiveFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
712 this->RemoveFromChannel();
713 }
714
715 private:
716 friend Base;
718 friend Receiver<T>;
719 template <typename, typename>
720 friend class CallbackTask;
721
722 explicit ReceiveFuture(internal::Channel<T>* channel)
723 PW_LOCKS_EXCLUDED(*channel)
724 : Base(channel, this->kAllowClosed) {}
725
726 PollOptional<T> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
727 if (this->channel() == nullptr) {
728 return Ready<std::optional<T>>(std::nullopt);
729 }
730
731 this->channel()->lock();
732 if (this->channel()->empty()) {
733 return this->StoreWakerForReceiveIfOpen(cx)
734 ? Pending()
735 : Ready<std::optional<T>>(std::nullopt);
736 }
737
738 auto result = Ready(this->channel()->PopAndWake());
739 this->Complete();
740 return result;
741 }
742};
743
745template <typename T>
746class Receiver {
747 public:
748 constexpr Receiver() : channel_(nullptr) {}
749
750 Receiver(const Receiver& other) = delete;
751 Receiver& operator=(const Receiver& other) = delete;
752
753 Receiver(Receiver&& other) noexcept
754 : channel_(std::exchange(other.channel_, nullptr)) {}
755
756 Receiver& operator=(Receiver&& other) noexcept {
757 if (this == &other) {
758 return *this;
759 }
760 if (channel_ != nullptr) {
761 channel_->remove_receiver();
762 }
763 channel_ = std::exchange(other.channel_, nullptr);
764 return *this;
765 }
766
767 ~Receiver() {
768 if (channel_ != nullptr) {
769 channel_->remove_receiver();
770 }
771 }
772
785 PW_LOCKS_EXCLUDED(*channel_) {
786 if (channel_ == nullptr) {
788 }
789
790 // Return immediately if a value is available or the channel is closed.
791 if (Result<T> result = channel_->TryReceive();
792 result.ok() || result.status().IsFailedPrecondition()) {
793 return result;
794 }
795
796 std::optional<T> result;
798
800 [&result, &notification](std::optional<T>&& val) {
801 result = std::move(val);
802 notification.release();
803 },
804 channel_);
805 dispatcher.Post(task);
806
807 if (timeout == internal::Channel<T>::kWaitForever) {
808 notification.acquire();
809 if (!result.has_value()) {
811 }
812 return Result<T>(std::move(*result));
813 }
814
815 if (!notification.try_acquire_for(timeout)) {
817 }
818
819 if (!result.has_value()) {
821 }
822 return Result<T>(std::move(*result));
823 }
824
833 return ReceiveFuture<T>(channel_);
834 }
835
843 if (channel_ == nullptr) {
845 }
846 return channel_->TryReceive();
847 }
848
853 void Disconnect() {
854 if (channel_ != nullptr) {
855 channel_->remove_receiver();
856 channel_ = nullptr;
857 }
858 }
859
861 [[nodiscard]] bool is_open() const {
862 return channel_ != nullptr && channel_->is_open();
863 }
864
865 private:
866 template <typename U>
867 friend class internal::Channel;
868
869 template <typename U>
870 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
871 CreateMpscChannel(Allocator&, uint16_t);
872
873 template <typename U, uint16_t kCapacity>
874 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
876
877 template <typename U>
878 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
879 CreateSpscChannel(Allocator&, uint16_t);
880
881 template <typename U, uint16_t kCapacity>
882 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
884
885 explicit Receiver(internal::Channel<T>& channel)
887 : channel_(&channel) {
888 channel_->add_receiver();
889 }
890
891 internal::Channel<T>* channel_;
892};
893
894template <typename T>
895class [[nodiscard]] SendFuture final
896 : public internal::ChannelFuture<SendFuture<T>, T, bool> {
897 private:
899
900 public:
901 SendFuture(SendFuture&& other) PW_LOCKS_EXCLUDED(*other.channel())
902 : Base(static_cast<Base&&>(other)), value_(std::move(other.value_)) {}
903
904 SendFuture& operator=(SendFuture&& other)
905 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
906 value_ = std::move(other.value_);
907 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
908 return static_cast<SendFuture&>(this->MoveAssignFrom(other));
909 }
910
911 ~SendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
912 this->RemoveFromChannel();
913 }
914
915 private:
916 friend Base;
918 friend Sender<T>;
919
920 SendFuture(internal::Channel<T>* channel, const T& value)
921 PW_LOCKS_EXCLUDED(*channel)
922 : Base(channel), value_(value) {}
923
924 SendFuture(internal::Channel<T>* channel, T&& value)
925 PW_LOCKS_EXCLUDED(*channel)
926 : Base(channel), value_(std::move(value)) {}
927
928 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
929 if (this->channel() == nullptr) {
930 return Ready(false);
931 }
932
933 this->channel()->lock();
934 if (!this->channel()->is_open_locked()) {
935 this->Complete();
936 return Ready(false);
937 }
938
939 if (this->channel()->full()) {
940 this->StoreWakerForSend(cx);
941 return Pending();
942 }
943
944 this->channel()->PushAndWake(std::move(value_));
945 this->Complete();
946 return Ready(true);
947 }
948
949 T value_;
950};
951
958template <typename T>
960 public:
961 SendReservation(const SendReservation& other) = delete;
962 SendReservation& operator=(const SendReservation& other) = delete;
963
965 : channel_(std::exchange(other.channel_, nullptr)) {}
966
967 SendReservation& operator=(SendReservation&& other) {
968 if (this == &other) {
969 return *this;
970 }
971 Cancel();
972 channel_ = std::exchange(other.channel_, nullptr);
973 return *this;
974 }
975
976 ~SendReservation() { Cancel(); }
977
979 template <typename... Args>
980 void Commit(Args&&... args) {
981 PW_ASSERT(channel_ != nullptr);
982 channel_->CommitReservationAndRemoveRef(std::forward<Args>(args)...);
983 channel_ = nullptr;
984 }
985
987 void Cancel() {
988 if (channel_ != nullptr) {
989 channel_->DropReservationAndRemoveRef();
990 channel_ = nullptr;
991 }
992 }
993
994 private:
996 friend class ReserveSendFuture<T>;
997 friend class Sender<T>;
998
999 explicit SendReservation(internal::Channel<T>& channel)
1001 : channel_(&channel) {
1002 channel_->add_ref();
1003 }
1004
1005 internal::Channel<T>* channel_;
1006};
1007
1008template <typename T>
1009class [[nodiscard]] ReserveSendFuture final
1010 : public internal::ChannelFuture<ReserveSendFuture<T>,
1011 T,
1012 std::optional<SendReservation<T>>> {
1013 private:
1014 using Base = internal::
1015 ChannelFuture<ReserveSendFuture, T, std::optional<SendReservation<T>>>;
1016
1017 public:
1018 ReserveSendFuture(ReserveSendFuture&& other) : Base(std::move(other)) {}
1019
1020 ReserveSendFuture& operator=(ReserveSendFuture&& other) {
1021 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1022 return static_cast<ReserveSendFuture&>(this->MoveAssignFrom(other));
1023 }
1024
1025 ~ReserveSendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1026 this->RemoveFromChannel();
1027 }
1028
1029 private:
1030 friend Base;
1031 friend internal::Channel<T>;
1032 friend Sender<T>;
1033
1034 explicit ReserveSendFuture(internal::Channel<T>* channel)
1035 PW_LOCKS_EXCLUDED(*channel)
1036 : Base(channel) {}
1037
1039 PW_LOCKS_EXCLUDED(*this->channel()) {
1040 if (this->channel() == nullptr) {
1041 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1042 }
1043
1044 this->channel()->lock();
1045 if (!this->channel()->is_open_locked()) {
1046 this->Complete();
1047 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1048 }
1049
1050 if (this->channel()->remaining_capacity_locked() == 0) {
1051 this->StoreWakerForReserveSend(cx);
1052 return Pending();
1053 }
1054
1055 this->channel()->add_reservation();
1056 SendReservation<T> reservation(*this->channel());
1057 this->Complete();
1058 return reservation;
1059 }
1060};
1061
1063template <typename T>
1064class Sender {
1065 public:
1066 constexpr Sender() : channel_(nullptr) {}
1067
1068 Sender(const Sender& other) = delete;
1069 Sender& operator=(const Sender& other) = delete;
1070
1071 Sender(Sender&& other) noexcept
1072 : channel_(std::exchange(other.channel_, nullptr)) {}
1073
1074 Sender& operator=(Sender&& other) noexcept {
1075 if (this == &other) {
1076 return *this;
1077 }
1078 if (channel_ != nullptr) {
1079 channel_->remove_sender();
1080 }
1081 channel_ = std::exchange(other.channel_, nullptr);
1082 return *this;
1083 }
1084
1085 ~Sender() {
1086 if (channel_ != nullptr) {
1087 channel_->remove_sender();
1088 }
1089 }
1090
1099 template <typename U>
1100 SendFuture<T> Send(U&& value) {
1101 return SendFuture<T>(channel_, std::forward<U>(value));
1102 }
1103
1110
1121 if (channel_ == nullptr) {
1123 }
1124 return channel_->TryReserveSend();
1125 }
1126
1136 Status TrySend(const T& value) {
1137 if (channel_ == nullptr) {
1139 }
1140 return channel_->TrySend(value);
1141 }
1142
1144 Status TrySend(T&& value) {
1145 if (channel_ == nullptr) {
1147 }
1148 return channel_->TrySend(std::move(value));
1149 }
1150
1162 const T& value,
1165 return BlockingSendMoveOrCopy(dispatcher, value, timeout);
1166 }
1167
1170 T&& value,
1173 return BlockingSendMoveOrCopy(dispatcher, std::move(value), timeout);
1174 }
1175
1180 void Disconnect() {
1181 if (channel_ != nullptr) {
1182 channel_->remove_sender();
1183 channel_ = nullptr;
1184 }
1185 }
1186
1188 uint16_t remaining_capacity() const {
1189 return channel_ != nullptr ? channel_->remaining_capacity() : 0;
1190 }
1191
1193 uint16_t capacity() const {
1194 return channel_ != nullptr ? channel_->capacity() : 0;
1195 }
1196
1198 [[nodiscard]] bool is_open() const {
1199 return channel_ != nullptr && channel_->is_open();
1200 }
1201
1202 private:
1203 template <typename U>
1204 friend class internal::Channel;
1205
1206 template <typename U>
1207 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
1208 CreateSpmcChannel(Allocator&, uint16_t);
1209
1210 template <typename U, uint16_t kCapacity>
1211 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
1213
1214 template <typename U>
1215 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
1216 CreateSpscChannel(Allocator&, uint16_t);
1217
1218 template <typename U, uint16_t kCapacity>
1219 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
1221
1222 explicit Sender(internal::Channel<T>& channel)
1224 : channel_(&channel) {
1225 channel_->add_sender();
1226 }
1227
1228 template <typename U>
1229 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1230 U&& value,
1232 PW_LOCKS_EXCLUDED(*channel_) {
1233 if (channel_ == nullptr) {
1235 }
1236
1237 if (Status status = channel_->TrySend(std::forward<U>(value));
1238 status.ok() || status.IsFailedPrecondition()) {
1239 return status;
1240 }
1241
1242 return BlockingSendFuture(
1243 dispatcher, // NOLINTNEXTLINE(bugprone-use-after-move)
1244 SendFuture<T>(channel_, std::forward<U>(value)),
1245 timeout);
1246 }
1247
1248 Status BlockingSendFuture(Dispatcher& dispatcher,
1249 SendFuture<T>&& future,
1251 PW_LOCKS_EXCLUDED(*channel_) {
1252 Status status;
1253 sync::TimedThreadNotification notification;
1254
1255 CallbackTask task(
1256 [&status, &notification](bool result) {
1257 status = result ? OkStatus() : Status::FailedPrecondition();
1258 notification.release();
1259 },
1260 std::move(future));
1261 dispatcher.Post(task);
1262
1263 if (timeout == internal::Channel<T>::kWaitForever) {
1264 notification.acquire();
1265 return status;
1266 }
1267
1268 if (!notification.try_acquire_for(timeout)) {
1269 task.Deregister();
1270 return Status::DeadlineExceeded();
1271 }
1272 return status;
1273 }
1274
1275 internal::Channel<T>* channel_;
1276};
1277
1291template <typename T>
1292std::optional<MpmcChannelHandle<T>> CreateMpmcChannel(Allocator& alloc,
1293 uint16_t capacity) {
1294 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1295 if (channel == nullptr) {
1296 return std::nullopt;
1297 }
1298 std::lock_guard lock(*channel);
1299 return MpmcChannelHandle<T>(*channel);
1300}
1301
1313template <typename T, uint16_t kCapacity>
1315 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1316 PW_DASSERT(!storage.active_locked());
1317 return MpmcChannelHandle<T>(storage);
1318}
1319
1333template <typename T>
1334std::optional<std::tuple<MpscChannelHandle<T>, Receiver<T>>> CreateMpscChannel(
1335 Allocator& alloc, uint16_t capacity) {
1336 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1337 if (channel == nullptr) {
1338 return std::nullopt;
1339 }
1340 std::lock_guard lock(*channel);
1341 return std::make_tuple(MpscChannelHandle<T>(*channel), Receiver<T>(*channel));
1342}
1343
1355template <typename T, uint16_t kCapacity>
1356std::tuple<MpscChannelHandle<T>, Receiver<T>> CreateMpscChannel(
1358 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1359 PW_DASSERT(!storage.active_locked());
1360 return std::make_tuple(MpscChannelHandle<T>(storage), Receiver<T>(storage));
1361}
1362
1376template <typename T>
1377std::optional<std::tuple<SpmcChannelHandle<T>, Sender<T>>> CreateSpmcChannel(
1378 Allocator& alloc, uint16_t capacity) {
1379 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1380 if (channel == nullptr) {
1381 return std::nullopt;
1382 }
1383 std::lock_guard lock(*channel);
1384 return std::make_tuple(SpmcChannelHandle<T>(*channel), Sender<T>(*channel));
1385}
1386
1398template <typename T, uint16_t kCapacity>
1399std::tuple<SpmcChannelHandle<T>, Sender<T>> CreateSpmcChannel(
1401 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1402 PW_DASSERT(!storage.active_locked());
1403 return std::make_tuple(SpmcChannelHandle<T>(storage), Sender<T>(storage));
1404}
1405
1419template <typename T>
1420std::optional<std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>>>
1421CreateSpscChannel(Allocator& alloc, uint16_t capacity) {
1422 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1423 if (channel == nullptr) {
1424 return std::nullopt;
1425 }
1426 std::lock_guard lock(*channel);
1427 return std::make_tuple(SpscChannelHandle<T>(*channel),
1428 Sender<T>(*channel),
1429 Receiver<T>(*channel));
1430}
1431
1443template <typename T, uint16_t kCapacity>
1444std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>> CreateSpscChannel(
1446 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1447 PW_DASSERT(!storage.active_locked());
1448 return std::make_tuple(
1449 SpscChannelHandle<T>(storage), Sender<T>(storage), Receiver<T>(storage));
1450}
1451
1452namespace internal {
1453
1454inline void BaseChannel::PopAndWakeOne(
1456 BaseChannelFuture& future = futures.front();
1457 futures.pop_front();
1458 future.Wake();
1459}
1460
1461} // namespace internal
1462} // namespace pw::async2
Definition: allocator.h:36
std::enable_if_t<!std::is_array_v< T >, T * > New(Args &&... args)
Definition: allocator.h:57
Abstract interface for releasing memory.
Definition: deallocator.h:29
Definition: deque.h:207
Definition: intrusive_forward_list.h:99
Definition: result.h:143
Definition: status.h:120
static constexpr Status DeadlineExceeded()
Definition: status.h:177
static constexpr Status Unavailable()
Definition: status.h:304
static constexpr Status FailedPrecondition()
Definition: status.h:243
Definition: callback_task.h:44
Definition: channel.h:660
Definition: context.h:54
Definition: dispatcher.h:53
A handle to a multi-producer, multi-consumer channel.
Definition: channel.h:560
A handle to a multi-producer, single-consumer channel.
Definition: channel.h:585
Definition: poll.h:60
Definition: channel.h:694
A receiver which reads values from an asynchronous channel.
Definition: channel.h:746
Definition: channel.h:1012
Definition: channel.h:896
Definition: channel.h:959
A sender which writes values to an asynchronous channel.
Definition: channel.h:1064
A handle to a single-producer, multi-consumer channel.
Definition: channel.h:609
A handle to a single-producer, single-consumer channel.
Definition: channel.h:633
Definition: channel.h:63
Definition: channel.h:287
Channel handle for a particular type T.
Definition: channel.h:532
Definition: channel.h:321
Definition: channel.h:454
Definition: storage.h:86
Definition: storage.h:39
constexpr size_type capacity() const noexcept
Returns the maximum number of elements in the deque.
Definition: generic_deque.h:74
Definition: interrupt_spin_lock.h:50
Definition: timed_thread_notification.h:40
bool try_acquire_for(chrono::SystemClock::duration timeout)
void Deallocate(void *ptr)
Definition: deallocator.h:60
friend std::optional< MpmcChannelHandle< U > > CreateMpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1292
SendFuture< T > Send(U &&value)
Definition: channel.h:1100
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1334
void Cancel()
Releases the reservation, making the space available for other senders.
Definition: channel.h:987
Result< SendReservation< T > > TryReserveSend()
Definition: channel.h:1120
std::optional< std::tuple< SpmcChannelHandle< T >, Sender< T > > > CreateSpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1377
std::optional< std::tuple< MpscChannelHandle< T >, Receiver< T > > > CreateMpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1334
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:861
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1377
Result< T > TryReceive()
Definition: channel.h:842
void Commit(Args &&... args)
Commits a value to a reserved slot.
Definition: channel.h:980
bool active() const
Definition: channel.h:684
Status TrySend(const T &value)
Definition: channel.h:1136
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1421
Result< T > BlockingReceive(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:782
ReserveSendFuture< T > ReserveSend()
Definition: channel.h:1109
uint16_t capacity() const
Returns the maximum capacity of the channel.
Definition: channel.h:1193
Status TrySend(T &&value)
Definition: channel.h:1144
uint16_t remaining_capacity() const
Returns the remaining capacity of the channel.
Definition: channel.h:1188
void Disconnect()
Definition: channel.h:1180
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1198
bool is_complete() const
True if the future has returned Ready().
Definition: channel.h:228
void Disconnect()
Definition: channel.h:853
std::optional< MpmcChannelHandle< T > > CreateMpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1292
std::optional< std::tuple< SpscChannelHandle< T >, Sender< T >, Receiver< T > > > CreateSpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1421
Status BlockingSend(Dispatcher &dispatcher, const T &value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1161
Sender< T > CreateSender()
Definition: channel.h:538
Receiver< T > CreateReceiver()
Definition: channel.h:545
ReceiveFuture< T > Receive()
Definition: channel.h:832
Status BlockingSend(Dispatcher &dispatcher, T &&value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1169
constexpr bool IsReady() const noexcept
Returns whether or not this value is Ready.
Definition: poll.h:133
constexpr PendingType Pending()
Returns a value indicating that an operation was not yet able to complete.
Definition: poll.h:271
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:255
std::chrono::duration< rep, period > duration
Alias for durations representable with this clock.
Definition: system_clock.h:90
void pop_front()
Definition: intrusive_forward_list.h:245
reference front()
Reference to the first element in the list. Undefined behavior if empty().
Definition: intrusive_forward_list.h:175
constexpr bool CheckedIncrement(T &base, Inc inc)
Definition: checked_arithmetic.h:118
constexpr Status OkStatus()
Definition: status.h:450
#define PW_LOCKABLE(name)
Definition: lock_annotations.h:208
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_NO_LOCK_SAFETY_ANALYSIS
Definition: lock_annotations.h:292
#define PW_EXCLUSIVE_LOCK_FUNCTION(...)
Definition: lock_annotations.h:230
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146
#define PW_LOCK_RETURNED(x)
Definition: lock_annotations.h:197
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:247
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:176