C/C++ API Reference
Loading...
Searching...
No Matches
channel.h
1// Copyright 2025 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include <mutex>
17
18#include "pw_allocator/allocator.h"
19#include "pw_async2/callback_task.h"
20#include "pw_async2/dispatcher.h"
21#include "pw_async2/future.h"
22#include "pw_containers/deque.h"
23#include "pw_numeric/checked_arithmetic.h"
24#include "pw_result/result.h"
25#include "pw_sync/interrupt_spin_lock.h"
26#include "pw_sync/lock_annotations.h"
27#include "pw_sync/timed_thread_notification.h"
28
29namespace pw::async2 {
30
31template <typename T>
32class Receiver;
33
34template <typename T>
35class ReceiveFuture;
36
37template <typename T>
38class Sender;
39
40template <typename T>
41class SendFuture;
42
43template <typename T>
44class ReserveSendFuture;
45
46template <typename T>
47class SendReservation;
48
49template <typename T, uint16_t kCapacity>
50class ChannelStorage;
51
52namespace internal {
53
54template <typename T>
55class ChannelHandle;
56
57template <typename T>
58class Channel {
59 public:
60 static Channel* Allocated(Allocator& alloc, uint16_t capacity) {
61 FixedDeque<T> deque = FixedDeque<T>::TryAllocate(alloc, capacity);
62 if (deque.capacity() == 0) {
63 return nullptr;
64 }
65 return alloc.New<Channel<T>>(std::move(deque));
66 }
67
68 ~Channel() { PW_ASSERT(ref_count_ == 0); }
69
72 [[nodiscard]] bool closed() const PW_LOCKS_EXCLUDED(lock_) {
73 std::lock_guard lock(lock_);
74 return closed_;
75 }
76
77 protected:
78 explicit Channel(FixedDeque<T>&& deque) : deque_(std::move(deque)) {}
79
80 template <size_t kAlignment, size_t kCapacity>
81 explicit Channel(containers::Storage<kAlignment, kCapacity>& storage)
82 : deque_(storage) {}
83
84 uint16_t ref_count() const PW_LOCKS_EXCLUDED(lock_) {
85 std::lock_guard lock(lock_);
86 return ref_count_;
87 }
88
89 private:
90 friend Allocator;
91 friend ChannelHandle<T>;
92 friend SendFuture<T>;
93 friend ReserveSendFuture<T>;
94 friend ReceiveFuture<T>;
95 friend Sender<T>;
96 friend SendReservation<T>;
97 friend Receiver<T>;
98
99 void Destroy() PW_LOCKS_EXCLUDED(lock_) {
100 Deallocator* deallocator = nullptr;
101 {
102 std::lock_guard lock(lock_);
103 deallocator = deque_.deallocator();
104 }
105
106 if (deallocator != nullptr) {
107 std::destroy_at(this);
108 deallocator->Deallocate(this);
109 }
110 }
111
112 void Close() PW_LOCKS_EXCLUDED(lock_) {
113 std::lock_guard lock(lock_);
114 CloseLocked();
115 }
116
117 void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
118 closed_ = true;
119 while (auto send_future = send_futures_.Pop()) {
120 send_future->get().Wake();
121 }
122 while (auto reserve_send_future = reserve_send_futures_.Pop()) {
123 reserve_send_future->get().Wake();
124 }
125 while (auto receive_future = receive_futures_.Pop()) {
126 receive_future->get().Wake();
127 }
128 }
129
131 Sender<T> CreateSender() PW_LOCKS_EXCLUDED(lock_) {
132 if (closed()) {
133 return Sender<T>(nullptr);
134 }
135 return Sender<T>(this);
136 }
137
139 Receiver<T> CreateReceiver() PW_LOCKS_EXCLUDED(lock_) {
140 if (closed()) {
141 return Receiver<T>(nullptr);
142 }
143 return Receiver<T>(this);
144 }
145
146 void PushAndWake(T&& value) PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
147 deque_.push_back(std::move(value));
148 WakeOneReceiver();
149 }
150
151 void PushAndWake(const T& value) PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
152 deque_.push_back(value);
153 WakeOneReceiver();
154 }
155
156 template <typename... Args>
157 void EmplaceAndWake(Args&&... args) PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
158 deque_.emplace_back(std::forward<Args>(args)...);
159 WakeOneReceiver();
160 }
161
162 void WakeOneReceiver() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
163 if (auto future = receive_futures_.Pop()) {
164 future->get().Wake();
165 }
166 }
167
168 T PopAndWake() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
169 PW_ASSERT(!deque_.empty());
170
171 T value = std::move(deque_.front());
172 deque_.pop_front();
173
174 WakeOneSender();
175 return value;
176 }
177
178 void WakeOneSender() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
179 // TODO: b/456507134 - Store both future types in the same list.
180 if (prioritize_reserve_) {
181 if (auto reserve_future = reserve_send_futures_.Pop()) {
182 reserve_future->get().Wake();
183 } else if (auto send_future = send_futures_.Pop()) {
184 send_future->get().Wake();
185 }
186 } else {
187 if (auto send_future = send_futures_.Pop()) {
188 send_future->get().Wake();
189 } else if (auto reserve_future = reserve_send_futures_.Pop()) {
190 reserve_future->get().Wake();
191 }
192 }
193
194 prioritize_reserve_ = !prioritize_reserve_;
195 }
196
197 bool full() PW_LOCKS_EXCLUDED(lock_) { return remaining_capacity() == 0; }
198 bool full_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
199 return remaining_capacity_locked() == 0;
200 }
201
202 uint16_t remaining_capacity() PW_LOCKS_EXCLUDED(lock_) {
203 std::lock_guard lock(lock_);
204 return remaining_capacity_locked();
205 }
206 uint16_t remaining_capacity_locked() const
208 return deque_.capacity() - deque_.size() - reservations_;
209 }
210
211 uint16_t capacity() const PW_NO_LOCK_SAFETY_ANALYSIS {
212 // SAFETY: The capacity of `deque_` cannot change.
213 return deque_.capacity();
214 }
215
216 bool empty() PW_LOCKS_EXCLUDED(lock_) {
217 std::lock_guard lock(lock_);
218 return deque_.empty();
219 }
220
221 void Push(const T& value) PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
222 PW_ASSERT(!closed_);
223 PushAndWake(value);
224 }
225
226 void Push(T&& value) PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
227 PW_ASSERT(!closed_);
228 PushAndWake(std::move(value));
229 }
230
231 bool TryPush(const T& value) PW_LOCKS_EXCLUDED(lock_) {
232 std::lock_guard lock(lock_);
233 if (closed_ || remaining_capacity_locked() == 0) {
234 return false;
235 }
236 PushAndWake(value);
237 return true;
238 }
239
240 bool TryPush(T&& value) PW_LOCKS_EXCLUDED(lock_) {
241 std::lock_guard lock(lock_);
242 if (closed_ || remaining_capacity_locked() == 0) {
243 return false;
244 }
245 PushAndWake(std::move(value));
246 return true;
247 }
248
249 std::optional<T> TryPop() PW_LOCKS_EXCLUDED(lock_) {
250 std::lock_guard lock(lock_);
251 if (deque_.empty()) {
252 return std::nullopt;
253 }
254 return PopAndWake();
255 }
256
257 bool Reserve() PW_LOCKS_EXCLUDED(lock_) {
258 std::lock_guard lock(lock_);
259 if (closed_ || remaining_capacity_locked() == 0) {
260 return false;
261 }
262 reservations_++;
263 return true;
264 }
265
266 void DropReservation() PW_LOCKS_EXCLUDED(lock_) {
267 std::lock_guard lock(lock_);
268 PW_ASSERT(reservations_ > 0);
269 reservations_--;
270 if (!closed_) {
271 WakeOneSender();
272 }
273 }
274
275 template <typename... Args>
276 void CommitReservation(Args&&... args) PW_LOCKS_EXCLUDED(lock_) {
277 std::lock_guard lock(lock_);
278 PW_ASSERT(reservations_ > 0);
279 reservations_--;
280 if (!closed_) {
281 EmplaceAndWake(std::forward<Args>(args)...);
282 }
283 }
284
285 void add_receiver() PW_LOCKS_EXCLUDED(lock_) {
286 std::lock_guard lock(lock_);
287 add_object(receiver_count_);
288 }
289
290 void add_sender() PW_LOCKS_EXCLUDED(lock_) {
291 std::lock_guard lock(lock_);
292 add_object(sender_count_);
293 }
294
295 void add_handle() PW_LOCKS_EXCLUDED(lock_) {
296 std::lock_guard lock(lock_);
297 add_object(handle_count_);
298 }
299
300 void add_object(uint8_t& counter) PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
301 if (!closed_) {
302 PW_ASSERT(CheckedAdd(counter, 1, counter));
303 }
304 PW_ASSERT(CheckedAdd(ref_count_, 1, ref_count_));
305 }
306
307 void remove_sender() PW_LOCKS_EXCLUDED(lock_) {
308 std::lock_guard lock(lock_);
309 remove_object(sender_count_);
310 }
311
312 void remove_receiver() PW_LOCKS_EXCLUDED(lock_) {
313 std::lock_guard lock(lock_);
314 remove_object(receiver_count_);
315 }
316
317 void remove_handle() PW_LOCKS_EXCLUDED(lock_) {
318 std::lock_guard lock(lock_);
319 remove_object(handle_count_);
320 }
321
322 void remove_object(uint8_t& counter) PW_UNLOCK_FUNCTION(lock_) {
323 if (!closed_) {
324 PW_ASSERT(counter > 0);
325 counter--;
326 if (should_close()) {
327 CloseLocked();
328 }
329 }
330 bool destroy = decrement_ref_locked();
331 lock_.unlock();
332
333 if (destroy) {
334 Destroy();
335 }
336 }
337
338 void add_ref() PW_LOCKS_EXCLUDED(lock_) {
339 std::lock_guard lock(lock_);
340 ref_count_++;
341 }
342
343 void remove_ref() PW_LOCKS_EXCLUDED(lock_) {
344 bool destroy;
345 {
346 std::lock_guard lock(lock_);
347 destroy = decrement_ref_locked();
348 }
349 if (destroy) {
350 Destroy();
351 }
352 }
353
354 bool decrement_ref_locked() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
355 ref_count_--;
356 return ref_count_ == 0;
357 }
358
365 bool should_close() const PW_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
366 if (handle_count_ > 0) {
367 return false;
368 }
369 return sender_count_ == 0 || receiver_count_ == 0;
370 }
371
372 // ListFutureProvider is internally synchronized.
373 ListFutureProvider<SendFuture<T>> send_futures_;
374 ListFutureProvider<ReserveSendFuture<T>> reserve_send_futures_;
375 ListFutureProvider<ReceiveFuture<T>> receive_futures_;
376
377 mutable sync::InterruptSpinLock lock_;
378 FixedDeque<T> deque_ PW_GUARDED_BY(lock_);
379 uint16_t reservations_ PW_GUARDED_BY(lock_) = 0;
380 bool closed_ PW_GUARDED_BY(lock_) = false;
381 bool prioritize_reserve_ PW_GUARDED_BY(lock_) = true;
382
383 // Channels are reference counted in two ways:
384 //
385 // - Senders and receivers are tracked independently. Once either reaches
386 // zero, the channel is closed, but not destroyed. No new values can be
387 // sent, but any buffered values can still be read.
388 //
389 // - Overall object reference count, including senders, receivers, futures,
390 // and channel handles. Once this reaches zero, the channel is destroyed.
391 //
392 uint8_t sender_count_ PW_GUARDED_BY(lock_) = 0;
393 uint8_t receiver_count_ PW_GUARDED_BY(lock_) = 0;
394 uint8_t handle_count_ PW_GUARDED_BY(lock_) = 0;
395 uint16_t ref_count_ PW_GUARDED_BY(lock_) = 0;
396};
397
403template <typename T>
405 public:
406 constexpr ChannelHandle() : channel_(nullptr) {}
407
408 ChannelHandle(const ChannelHandle& other) : channel_(other.channel_) {
409 if (channel_ != nullptr) {
410 channel_->add_handle();
411 }
412 }
413
414 ChannelHandle& operator=(const ChannelHandle& other) {
415 if (channel_ != nullptr) {
416 channel_->remove_handle();
417 }
418 channel_ = other.channel_;
419 if (channel_ != nullptr) {
420 channel_->add_handle();
421 }
422 return *this;
423 }
424
425 ChannelHandle(ChannelHandle&& other) noexcept
426 : channel_(std::exchange(other.channel_, nullptr)) {}
427
428 ChannelHandle& operator=(ChannelHandle&& other) noexcept {
429 if (this == &other) {
430 return *this;
431 }
432 if (channel_ != nullptr) {
433 channel_->remove_handle();
434 }
435 channel_ = std::exchange(other.channel_, nullptr);
436 return *this;
437 }
438
439 ~ChannelHandle() { Release(); }
440
441 [[nodiscard]] bool is_open() const {
442 return channel_ != nullptr && !channel_->closed();
443 }
444
448 PW_ASSERT(channel_ != nullptr);
449 return channel_->CreateSender();
450 }
451
456 PW_ASSERT(channel_ != nullptr);
457 return channel_->CreateReceiver();
458 }
459
462 void Close() {
463 if (channel_ != nullptr) {
464 channel_->Close();
465 }
466 }
467
474 void Release() {
475 if (channel_ != nullptr) {
476 channel_->remove_handle();
477 channel_ = nullptr;
478 }
479 }
480
481 protected:
482 explicit ChannelHandle(internal::Channel<T>* channel) : channel_(channel) {
483 if (channel_ != nullptr) {
484 channel_->add_handle();
485 }
486 }
487
488 private:
489 internal::Channel<T>* channel_;
490};
491
492} // namespace internal
493
495template <typename T>
497 public:
498 constexpr MpmcChannelHandle() = default;
499
500 using internal::ChannelHandle<T>::is_open;
505
506 private:
507 explicit MpmcChannelHandle(internal::Channel<T>* channel)
508 : internal::ChannelHandle<T>(channel) {}
509
510 template <typename U>
511 friend std::optional<MpmcChannelHandle<U>> CreateMpmcChannel(Allocator&,
512 uint16_t);
513
514 template <typename U, uint16_t kCapacity>
517};
518
520template <typename T>
522 public:
523 constexpr MpscChannelHandle() = default;
524
525 using internal::ChannelHandle<T>::is_open;
529
530 private:
531 explicit MpscChannelHandle(internal::Channel<T>* channel)
532 : internal::ChannelHandle<T>(channel) {}
533
534 template <typename U>
535 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
536 CreateMpscChannel(Allocator&, uint16_t);
537
538 template <typename U, uint16_t kCapacity>
539 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
541};
542
544template <typename T>
546 public:
547 constexpr SpmcChannelHandle() = default;
548
549 using internal::ChannelHandle<T>::is_open;
553
554 private:
555 explicit SpmcChannelHandle(internal::Channel<T>* channel)
556 : internal::ChannelHandle<T>(channel) {}
557
558 template <typename U>
559 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
560 CreateSpmcChannel(Allocator&, uint16_t);
561
562 template <typename U, uint16_t kCapacity>
563 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
565};
566
568template <typename T>
570 public:
571 constexpr SpscChannelHandle() = default;
572
573 using internal::ChannelHandle<T>::is_open;
576
577 private:
578 explicit SpscChannelHandle(internal::Channel<T>* channel)
579 : internal::ChannelHandle<T>(channel) {}
580
581 template <typename U>
582 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
583 CreateSpscChannel(Allocator&, uint16_t);
584
585 template <typename U, uint16_t kCapacity>
586 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
588};
589
596template <typename T, uint16_t kCapacity>
598 public internal::Channel<T> {
599 public:
600 ChannelStorage() : internal::Channel<T>(this->storage_array) {}
601
604 [[nodiscard]] bool active() const { return this->ref_count() != 0; }
605
606 private:
607 using internal::Channel<T>::Allocated;
608};
609
610template <typename T>
611class [[nodiscard]] ReceiveFuture
612 : public ListableFutureWithWaker<ReceiveFuture<T>, std::optional<T>> {
613 public:
615 : Base(Base::kMovedFrom),
616 channel_(std::exchange(other.channel_, nullptr)) {
617 Base::MoveFrom(other);
618 }
619
620 ReceiveFuture& operator=(ReceiveFuture&& other) {
621 if (this == &other) {
622 return *this;
623 }
624 if (channel_ != nullptr) {
625 channel_->remove_ref();
626 }
627 channel_ = std::exchange(other.channel_, nullptr);
628 Base::MoveFrom(other);
629 return *this;
630 }
631
632 ~ReceiveFuture() { reset(); }
633
634 private:
635 using Base = ListableFutureWithWaker<ReceiveFuture<T>, std::optional<T>>;
636 friend Base;
638 friend Receiver<T>;
639
640 static constexpr const char kWaitReason[] = "Receiver::Receive";
641
642 explicit ReceiveFuture(internal::Channel<T>& channel)
643 : Base(channel.receive_futures_), channel_(&channel) {
644 channel_->add_ref();
645 }
646
647 ReceiveFuture() : Base(Base::kReadyForCompletion), channel_(nullptr) {}
648
650 if (channel_ == nullptr) {
651 return Ready<std::optional<T>>(std::nullopt);
652 }
653
654 std::optional<T> value = channel_->TryPop();
655 if (!value.has_value()) {
656 if (channel_->closed()) {
657 reset();
658 return Ready<std::optional<T>>(std::nullopt);
659 }
660 return Pending();
661 }
662
663 reset();
664 return Ready(std::move(value));
665 }
666
667 void reset() {
668 if (channel_ != nullptr) {
669 channel_->remove_ref();
670 channel_ = nullptr;
671 }
672 }
673
674 using Base::Wake;
675
676 internal::Channel<T>* channel_;
677};
678
680template <typename T>
681class Receiver {
682 public:
683 constexpr Receiver() : channel_(nullptr) {}
684
685 Receiver(const Receiver& other) = delete;
686 Receiver& operator=(const Receiver& other) = delete;
687
688 Receiver(Receiver&& other) noexcept
689 : channel_(std::exchange(other.channel_, nullptr)) {}
690
691 Receiver& operator=(Receiver&& other) noexcept {
692 if (this == &other) {
693 return *this;
694 }
695 if (channel_ != nullptr) {
696 channel_->remove_receiver();
697 }
698 channel_ = std::exchange(other.channel_, nullptr);
699 return *this;
700 }
701
702 ~Receiver() {
703 if (channel_ != nullptr) {
704 channel_->remove_receiver();
705 }
706 }
707
718 Dispatcher& dispatcher,
719 chrono::SystemClock::duration timeout = kWaitForever) {
720 if (channel_ == nullptr) {
722 }
723
724 Result<T> available = TryReceive();
725 if (available.ok()) {
726 return available;
727 }
728
729 std::optional<T> result;
731
732 FutureCallbackTask task(ReceiveFuture<T>(*channel_),
733 [&result, &notification](std::optional<T> value) {
734 result = std::move(value);
735 notification.release();
736 });
737 dispatcher.Post(task);
738
739 if (timeout == kWaitForever) {
740 notification.acquire();
741 if (!result.has_value()) {
743 }
744 return Result<T>(std::move(result.value()));
745 }
746
747 if (!notification.try_acquire_for(timeout)) {
748 task.Deregister();
750 }
751
752 if (!result.has_value()) {
754 }
755 return Result<T>(std::move(result.value()));
756 }
757
766 if (channel_ == nullptr) {
767 return ReceiveFuture<T>();
768 }
769 return ReceiveFuture<T>(*channel_);
770 }
771
779 if (channel_ == nullptr) {
781 }
782 std::optional<T> value = channel_->TryPop();
783 if (value.has_value()) {
784 return std::move(*value);
785 }
786 if (channel_->closed()) {
788 }
789 return Status::Unavailable();
790 }
791
796 void Disconnect() {
797 if (channel_ != nullptr) {
798 channel_->remove_receiver();
799 channel_ = nullptr;
800 }
801 }
802
804 [[nodiscard]] bool is_open() const {
805 return channel_ != nullptr && !channel_->closed();
806 }
807
808 private:
809 static constexpr chrono::SystemClock::duration kWaitForever =
810 chrono::SystemClock::duration::max();
811
812 template <typename U>
813 friend class internal::Channel;
814
815 template <typename U>
816 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
817 CreateMpscChannel(Allocator&, uint16_t);
818
819 template <typename U, uint16_t kCapacity>
820 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
822
823 template <typename U>
824 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
825 CreateSpscChannel(Allocator&, uint16_t);
826
827 template <typename U, uint16_t kCapacity>
828 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
830
831 explicit Receiver(internal::Channel<T>* channel) : channel_(channel) {
832 if (channel_ != nullptr) {
833 channel_->add_receiver();
834 }
835 }
836
837 internal::Channel<T>* channel_;
838};
839
840template <typename T>
841class [[nodiscard]] SendFuture
842 : public ListableFutureWithWaker<SendFuture<T>, bool> {
843 public:
844 SendFuture(SendFuture&& other)
845 : Base(Base::kMovedFrom),
846 channel_(std::exchange(other.channel_, nullptr)),
847 value_(std::move(other.value_)) {
848 Base::MoveFrom(other);
849 }
850
851 SendFuture& operator=(SendFuture&& other) {
852 if (this == &other) {
853 return *this;
854 }
855 if (channel_ != nullptr) {
856 channel_->remove_ref();
857 }
858 channel_ = std::exchange(other.channel_, nullptr);
859 value_ = std::move(other.value_);
860 Base::MoveFrom(other);
861 return *this;
862 }
863
864 ~SendFuture() { reset(); }
865
866 private:
868 friend Base;
870 friend Sender<T>;
871
872 static constexpr const char kWaitReason[] = "Sender::Send";
873
874 SendFuture(internal::Channel<T>& channel, const T& value)
875 : Base(channel.send_futures_), channel_(&channel), value_(value) {
876 channel_->add_ref();
877 }
878
879 SendFuture(internal::Channel<T>& channel, T&& value)
880 : Base(channel.send_futures_),
881 channel_(&channel),
882 value_(std::move(value)) {
883 channel_->add_ref();
884 }
885
886 enum ClosedState { kClosed };
887
888 SendFuture(ClosedState, const T& value)
889 : Base(Base::kReadyForCompletion), channel_(nullptr), value_(value) {}
890
891 SendFuture(ClosedState, T&& value)
892 : Base(Base::kReadyForCompletion),
893 channel_(nullptr),
894 value_(std::move(value)) {}
895
896 Poll<bool> DoPend(async2::Context&) {
897 if (channel_ == nullptr || channel_->closed()) {
898 reset();
899 return Ready(false);
900 }
901
902 {
903 std::lock_guard lock(channel_->lock_);
904 if (channel_->full_locked()) {
905 return Pending();
906 }
907
908 channel_->Push(std::move(value_));
909 }
910
911 reset();
912 return Ready(true);
913 }
914
915 void reset() {
916 if (channel_ != nullptr) {
917 channel_->remove_ref();
918 channel_ = nullptr;
919 }
920 }
921
922 using Base::Wake;
923
924 internal::Channel<T>* channel_;
925 T value_;
926};
927
934template <typename T>
936 public:
937 SendReservation(const SendReservation& other) = delete;
938 SendReservation& operator=(const SendReservation& other) = delete;
939
941 : channel_(std::exchange(other.channel_, nullptr)) {}
942
943 SendReservation& operator=(SendReservation&& other) {
944 if (this == &other) {
945 return *this;
946 }
947 Cancel();
948 channel_ = std::exchange(other.channel_, nullptr);
949 return *this;
950 }
951
952 ~SendReservation() { Cancel(); }
953
955 template <typename... Args>
956 void Commit(Args&&... args) {
957 PW_ASSERT(channel_ != nullptr);
958 channel_->CommitReservation(std::forward<Args>(args)...);
959 channel_->remove_ref();
960 channel_ = nullptr;
961 }
962
964 void Cancel() {
965 if (channel_ != nullptr) {
966 channel_->DropReservation();
967 channel_->remove_ref();
968 channel_ = nullptr;
969 }
970 }
971
972 private:
973 friend class ReserveSendFuture<T>;
974 friend class Sender<T>;
975
976 explicit SendReservation(internal::Channel<T>& channel) : channel_(&channel) {
977 channel_->add_ref();
978 }
979
980 internal::Channel<T>* channel_;
981};
982
983template <typename T>
984class [[nodiscard]] ReserveSendFuture
985 : public ListableFutureWithWaker<ReserveSendFuture<T>,
986 std::optional<SendReservation<T>>> {
987 public:
989 : Base(Base::kMovedFrom),
990 channel_(std::exchange(other.channel_, nullptr)) {
991 Base::MoveFrom(other);
992 }
993
994 ReserveSendFuture& operator=(ReserveSendFuture&& other) {
995 if (channel_ != nullptr) {
996 channel_->remove_ref();
997 }
998 channel_ = std::exchange(other.channel_, nullptr);
999 Base::MoveFrom(other);
1000 return *this;
1001 }
1002
1003 ~ReserveSendFuture() { reset(); }
1004
1005 private:
1007 std::optional<SendReservation<T>>>;
1008 friend Base;
1009 friend internal::Channel<T>;
1010 friend Sender<T>;
1011
1012 static constexpr const char kWaitReason[] = "Sender::ReserveSend";
1013
1014 explicit ReserveSendFuture(internal::Channel<T>* channel)
1015 : Base(channel->reserve_send_futures_), channel_(channel) {
1016 channel_->add_ref();
1017 }
1018
1019 enum ClosedState { kClosed };
1020
1021 explicit ReserveSendFuture(ClosedState)
1022 : Base(Base::kReadyForCompletion), channel_(nullptr) {}
1023
1025 if (channel_ == nullptr || channel_->closed()) {
1026 reset();
1027 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1028 }
1029
1030 if (!channel_->Reserve()) {
1031 return Pending();
1032 }
1033
1034 SendReservation<T> reservation(*channel_);
1035 reset();
1036 return reservation;
1037 }
1038
1039 void reset() {
1040 if (channel_ != nullptr) {
1041 channel_->remove_ref();
1042 channel_ = nullptr;
1043 }
1044 }
1045
1046 using Base::Wake;
1047
1048 internal::Channel<T>* channel_;
1049};
1050
1052template <typename T>
1053class Sender {
1054 public:
1055 constexpr Sender() : channel_(nullptr) {}
1056
1057 Sender(const Sender& other) = delete;
1058 Sender& operator=(const Sender& other) = delete;
1059
1060 Sender(Sender&& other) noexcept
1061 : channel_(std::exchange(other.channel_, nullptr)) {}
1062
1063 Sender& operator=(Sender&& other) noexcept {
1064 if (this == &other) {
1065 return *this;
1066 }
1067 if (channel_ != nullptr) {
1068 channel_->remove_sender();
1069 }
1070 channel_ = std::exchange(other.channel_, nullptr);
1071 return *this;
1072 }
1073
1074 ~Sender() {
1075 if (channel_ != nullptr) {
1076 channel_->remove_sender();
1077 }
1078 }
1079
1088 SendFuture<T> Send(const T& value) {
1089 if (channel_ == nullptr) {
1091 }
1092 return SendFuture<T>(*channel_, value);
1093 }
1094
1103 SendFuture<T> Send(T&& value) {
1104 if (channel_ == nullptr) {
1105 return SendFuture<T>(SendFuture<T>::kClosed, std::move(value));
1106 }
1107 return SendFuture<T>(*channel_, std::move(value));
1108 }
1109
1116 if (channel_ == nullptr) {
1118 }
1119 return ReserveSendFuture<T>(channel_);
1120 }
1121
1129 std::optional<SendReservation<T>> TryReserveSend() {
1130 if (channel_ == nullptr) {
1131 return std::nullopt;
1132 }
1133 if (!channel_->Reserve()) {
1134 return std::nullopt;
1135 }
1136 return SendReservation<T>(*channel_);
1137 }
1138
1143 bool TrySend(const T& value) {
1144 if (channel_ == nullptr) {
1145 return false;
1146 }
1147 return channel_->TryPush(value);
1148 }
1149
1154 bool TrySend(T&& value) {
1155 if (channel_ == nullptr) {
1156 return false;
1157 }
1158 return channel_->TryPush(std::move(value));
1159 }
1160
1172 const T& value,
1173 chrono::SystemClock::duration timeout = kWaitForever) {
1174 if (channel_ == nullptr) {
1176 }
1177 return BlockingSend(dispatcher, SendFuture<T>(*channel_, value), timeout);
1178 }
1179
1191 T&& value,
1192 chrono::SystemClock::duration timeout = kWaitForever) {
1193 if (channel_ == nullptr) {
1195 }
1196 return BlockingSend(
1197 dispatcher, SendFuture<T>(*channel_, std::move(value)), timeout);
1198 }
1199
1204 void Disconnect() {
1205 if (channel_ != nullptr) {
1206 channel_->remove_sender();
1207 channel_ = nullptr;
1208 }
1209 }
1210
1212 uint16_t remaining_capacity() const {
1213 return channel_ != nullptr ? channel_->remaining_capacity() : 0;
1214 }
1215
1217 uint16_t capacity() const {
1218 return channel_ != nullptr ? channel_->capacity() : 0;
1219 }
1220
1222 [[nodiscard]] bool is_open() const {
1223 return channel_ != nullptr && !channel_->closed();
1224 }
1225
1226 private:
1227 static constexpr chrono::SystemClock::duration kWaitForever =
1228 chrono::SystemClock::duration::max();
1229
1230 template <typename U>
1231 friend class internal::Channel;
1232
1233 template <typename U>
1234 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
1235 CreateSpmcChannel(Allocator&, uint16_t);
1236
1237 template <typename U, uint16_t kCapacity>
1238 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
1240
1241 template <typename U>
1242 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
1243 CreateSpscChannel(Allocator&, uint16_t);
1244
1245 template <typename U, uint16_t kCapacity>
1246 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
1248
1249 explicit Sender(internal::Channel<T>* channel) : channel_(channel) {
1250 if (channel_ != nullptr) {
1251 channel_->add_sender();
1252 }
1253 }
1254
1255 Status BlockingSend(Dispatcher& dispatcher,
1256 SendFuture<T>&& future,
1258 Status status;
1259 sync::TimedThreadNotification notification;
1260
1261 FutureCallbackTask task(
1262 std::move(future), [&status, &notification](bool result) {
1263 status = result ? OkStatus() : Status::FailedPrecondition();
1264 notification.release();
1265 });
1266 dispatcher.Post(task);
1267
1268 if (timeout == kWaitForever) {
1269 notification.acquire();
1270 return status;
1271 }
1272
1273 if (!notification.try_acquire_for(timeout)) {
1274 task.Deregister();
1275 return Status::DeadlineExceeded();
1276 }
1277 return status;
1278 }
1279
1280 internal::Channel<T>* channel_;
1281};
1282
1296template <typename T>
1297std::optional<MpmcChannelHandle<T>> CreateMpmcChannel(Allocator& alloc,
1298 uint16_t capacity) {
1299 auto channel = internal::Channel<T>::Allocated(alloc, capacity);
1300 if (channel == nullptr) {
1301 return std::nullopt;
1302 }
1303 return MpmcChannelHandle<T>(channel);
1304}
1305
1317template <typename T, uint16_t kCapacity>
1318MpmcChannelHandle<T> CreateMpmcChannel(ChannelStorage<T, kCapacity>& storage) {
1319 PW_ASSERT(!storage.active());
1320 return MpmcChannelHandle<T>(&storage);
1321}
1322
1336template <typename T>
1337std::optional<std::tuple<MpscChannelHandle<T>, Receiver<T>>> CreateMpscChannel(
1338 Allocator& alloc, uint16_t capacity) {
1339 auto channel = internal::Channel<T>::Allocated(alloc, capacity);
1340 if (channel == nullptr) {
1341 return std::nullopt;
1342 }
1343 return std::make_tuple(MpscChannelHandle<T>(channel), Receiver<T>(channel));
1344}
1345
1357template <typename T, uint16_t kCapacity>
1358std::tuple<MpscChannelHandle<T>, Receiver<T>> CreateMpscChannel(
1360 PW_ASSERT(!storage.active());
1361 return std::make_tuple(MpscChannelHandle<T>(&storage), Receiver<T>(&storage));
1362}
1363
1377template <typename T>
1378std::optional<std::tuple<SpmcChannelHandle<T>, Sender<T>>> CreateSpmcChannel(
1379 Allocator& alloc, uint16_t capacity) {
1380 auto channel = internal::Channel<T>::Allocated(alloc, capacity);
1381 if (channel == nullptr) {
1382 return std::nullopt;
1383 }
1384 return std::make_tuple(SpmcChannelHandle<T>(channel), Sender<T>(channel));
1385}
1386
1398template <typename T, uint16_t kCapacity>
1399std::tuple<SpmcChannelHandle<T>, Sender<T>> CreateSpmcChannel(
1401 PW_ASSERT(!storage.active());
1402 return std::make_tuple(SpmcChannelHandle<T>(&storage), Sender<T>(&storage));
1403}
1404
1418template <typename T>
1419std::optional<std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>>>
1420CreateSpscChannel(Allocator& alloc, uint16_t capacity) {
1421 auto channel = internal::Channel<T>::Allocated(alloc, capacity);
1422 if (channel == nullptr) {
1423 return std::nullopt;
1424 }
1425 return std::make_tuple(
1426 SpscChannelHandle<T>(channel), Sender<T>(channel), Receiver<T>(channel));
1427}
1428
1440template <typename T, uint16_t kCapacity>
1441std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>> CreateSpscChannel(
1443 PW_ASSERT(!storage.active());
1444 return std::make_tuple(SpscChannelHandle<T>(&storage),
1445 Sender<T>(&storage),
1446 Receiver<T>(&storage));
1447}
1448
1449} // namespace pw::async2
Definition: allocator.h:36
std::enable_if_t<!std::is_array_v< T >, T * > New(Args &&... args)
Definition: allocator.h:57
Definition: deque.h:208
Definition: result.h:143
constexpr bool ok() const
Definition: result.h:447
Definition: status.h:120
static constexpr Status DeadlineExceeded()
Definition: status.h:177
static constexpr Status Unavailable()
Definition: status.h:304
static constexpr Status FailedPrecondition()
Definition: status.h:243
Definition: channel.h:598
bool active() const
Definition: channel.h:604
Definition: context.h:54
Definition: dispatcher.h:53
void Post(Task &task)
Definition: callback_task.h:44
Definition: future.h:283
A handle to a multi-producer, multi-consumer channel.
Definition: channel.h:496
friend std::optional< MpmcChannelHandle< U > > CreateMpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1297
A handle to a multi-producer, single-consumer channel.
Definition: channel.h:521
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1337
Definition: poll.h:60
Definition: channel.h:612
A receiver which reads values from an asynchronous channel.
Definition: channel.h:681
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1337
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:804
Result< T > TryReceive()
Definition: channel.h:778
Result< T > BlockingReceive(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=kWaitForever)
Definition: channel.h:717
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1420
void Disconnect()
Definition: channel.h:796
ReceiveFuture< T > Receive()
Definition: channel.h:765
Definition: channel.h:986
Definition: channel.h:842
Definition: channel.h:935
void Cancel()
Releases the reservation, making the space available for other senders.
Definition: channel.h:964
void Commit(Args &&... args)
Commits a value to a reserved slot.
Definition: channel.h:956
A sender which writes values to an asynchronous channel.
Definition: channel.h:1053
Status BlockingSend(Dispatcher &dispatcher, const T &value, chrono::SystemClock::duration timeout=kWaitForever)
Definition: channel.h:1171
bool TrySend(const T &value)
Definition: channel.h:1143
Status BlockingSend(Dispatcher &dispatcher, T &&value, chrono::SystemClock::duration timeout=kWaitForever)
Definition: channel.h:1190
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1378
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1420
ReserveSendFuture< T > ReserveSend()
Definition: channel.h:1115
uint16_t capacity() const
Returns the maximum capacity of the channel.
Definition: channel.h:1217
bool TrySend(T &&value)
Definition: channel.h:1154
uint16_t remaining_capacity() const
Returns the remaining capacity of the channel.
Definition: channel.h:1212
void Disconnect()
Definition: channel.h:1204
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1222
SendFuture< T > Send(T &&value)
Definition: channel.h:1103
std::optional< SendReservation< T > > TryReserveSend()
Definition: channel.h:1129
SendFuture< T > Send(const T &value)
Definition: channel.h:1088
A handle to a single-producer, multi-consumer channel.
Definition: channel.h:545
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1378
A handle to a single-producer, single-consumer channel.
Definition: channel.h:569
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1420
Definition: channel.h:404
void Release()
Definition: channel.h:474
void Close()
Definition: channel.h:462
Sender< T > CreateSender()
Definition: channel.h:447
Receiver< T > CreateReceiver()
Definition: channel.h:455
Definition: channel.h:58
bool closed() const
Definition: channel.h:72
Definition: storage.h:36
constexpr size_type capacity() const noexcept
Returns the maximum number of elements in the deque.
Definition: generic_deque.h:74
Definition: timed_thread_notification.h:40
bool try_acquire_for(chrono::SystemClock::duration timeout)
constexpr PendingType Pending()
Returns a value indicating that an operation was not yet able to complete.
Definition: poll.h:271
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:255
std::chrono::duration< rep, period > duration
Alias for durations representable with this clock.
Definition: system_clock.h:90
constexpr bool CheckedAdd(A a, B b, T &result)
Definition: checked_arithmetic.h:70
constexpr Status OkStatus()
Definition: status.h:450
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_NO_LOCK_SAFETY_ANALYSIS
Definition: lock_annotations.h:292
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:247
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:176