C/C++ API Reference
Loading...
Searching...
No Matches
channel.h
1// Copyright 2025 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include <mutex>
17#include <optional>
18
19#include "lib/stdcompat/type_traits.h"
20#include "pw_allocator/allocator.h"
21#include "pw_async2/callback_task.h"
22#include "pw_async2/dispatcher.h"
23#include "pw_async2/future.h"
24#include "pw_containers/deque.h"
25#include "pw_numeric/checked_arithmetic.h"
26#include "pw_result/result.h"
27#include "pw_sync/interrupt_spin_lock.h"
28#include "pw_sync/lock_annotations.h"
29#include "pw_sync/timed_thread_notification.h"
30
31namespace pw::async2 {
32
33template <typename T>
34class Receiver;
35
36template <typename T>
37class ReceiveFuture;
38
39template <typename T>
40class Sender;
41
42template <typename T>
43class SendFuture;
44
45template <typename T>
46class ReserveSendFuture;
47
48template <typename T>
49class SendReservation;
50
51template <typename T, uint16_t kCapacity>
52class ChannelStorage;
53
54namespace internal {
55
56template <typename T>
57class Channel;
58
59class BaseChannel;
60
62 public:
63 constexpr BaseChannelFuture() : channel_(nullptr) {}
64
65 BaseChannelFuture(const BaseChannelFuture&) = delete;
66 BaseChannelFuture& operator=(const BaseChannelFuture&) = delete;
67
68 // Derived classes call MoveAssignFrom to move rather than use the operator.
69 BaseChannelFuture& operator=(BaseChannelFuture&&) = delete;
70
72 [[nodiscard]] bool is_pendable() const { return core_.is_pendable(); }
73
75 [[nodiscard]] bool is_complete() const { return core_.is_complete(); }
76
77 protected:
78 // Creates a new future, storing nullptr if `channel` is nullptr or if the
79 // channel is closed.
80 explicit BaseChannelFuture(BaseChannel* channel) PW_LOCKS_EXCLUDED(*channel);
81
82 enum AllowClosed { kAllowClosed };
83
84 // Creates a new future. Always increases the ref count, even if the channel
85 // is closed. This allows ReceiveFutures to read values from closed channels.
86 BaseChannelFuture(BaseChannel* channel, AllowClosed)
87 PW_LOCKS_EXCLUDED(*channel)
88 : core_(FutureState::kPending) {
89 StoreAndAddRefIfNonnull(channel);
90 }
91
92 BaseChannelFuture(BaseChannelFuture&& other)
93 PW_LOCKS_EXCLUDED(*channel_, *other.channel_)
94 : channel_(other.channel_) {
95 MoveFrom(other);
96 }
97
98 BaseChannelFuture& MoveAssignFrom(BaseChannelFuture& other)
99 PW_LOCKS_EXCLUDED(*channel_, *other.channel_);
100
101 // Unlists this future and removes a reference from the channel.
102 void RemoveFromChannel() PW_LOCKS_EXCLUDED(*channel_);
103
104 bool StoreWakerForReceiveIfOpen(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
105
106 void StoreWakerForSend(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
107
108 void StoreWakerForReserveSend(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
109
110 void MarkCompleted() { core_.MarkComplete(); }
111
112 void Complete() PW_UNLOCK_FUNCTION(*channel_);
113
114 BaseChannel* base_channel() PW_LOCK_RETURNED(channel_) { return channel_; }
115
116 private:
117 void StoreAndAddRefIfNonnull(BaseChannel* channel)
118 PW_LOCKS_EXCLUDED(*channel);
119
120 void MoveFrom(BaseChannelFuture& other) PW_LOCKS_EXCLUDED(*other.channel_);
121
122 BaseChannel* channel_;
123 FutureCore core_;
124
125 public:
126 using List = FutureList<&BaseChannelFuture::core_>;
127};
128
129// Adds a Pend function to BaseChannelFuture.
130template <typename Derived, typename T, typename FutureValue>
132 public:
133 using value_type = FutureValue;
134
135 Poll<value_type> Pend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
136 Poll<value_type> result = static_cast<Derived&>(*this).DoPend(cx);
137 if (result.IsReady()) {
138 // If Ready(), the future is no longer associated with the channel, so it
139 // can be marked complete without the lock held.
140 MarkCompleted();
141 }
142 return result;
143 }
144
145 protected:
146 constexpr ChannelFuture() = default;
147
148 explicit ChannelFuture(Channel<T>* channel) : BaseChannelFuture(channel) {}
149
150 ChannelFuture(Channel<T>* channel, AllowClosed)
151 : BaseChannelFuture(channel, kAllowClosed) {}
152
153 ChannelFuture(ChannelFuture&& other) : BaseChannelFuture(std::move(other)) {}
154
155 Channel<T>* channel() PW_LOCK_RETURNED(this->base_channel()) {
156 return static_cast<Channel<T>*>(base_channel());
157 }
158
159 private:
160 using BaseChannelFuture::base_channel;
161 using BaseChannelFuture::MarkCompleted;
162};
163
164// Internal generic channel type. BaseChannel is not exposed to users. Its
165// public interface is for internal consumption.
166class PW_LOCKABLE("pw::async2::internal::BaseChannel") BaseChannel {
167 public:
168 static constexpr chrono::SystemClock::duration kWaitForever =
169 chrono::SystemClock::duration::max();
170
171 // Acquires the channel's lock.
172 void lock() PW_EXCLUSIVE_LOCK_FUNCTION() { lock_.lock(); }
173
174 // Releases the channel's lock.
175 void unlock() PW_UNLOCK_FUNCTION() { lock_.unlock(); }
176
177 [[nodiscard]] bool is_open() PW_LOCKS_EXCLUDED(*this) {
178 std::lock_guard lock(*this);
179 return is_open_locked();
180 }
181
182 bool is_open_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
183 return !closed_;
184 }
185
186 [[nodiscard]] bool active_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
187 return ref_count_ != 0;
188 }
189
190 // Removes a reference to this channel and destroys the channel if needed.
191 void RemoveRefAndDestroyIfUnreferenced() PW_UNLOCK_FUNCTION();
192
193 void Close() PW_LOCKS_EXCLUDED(*this) {
194 std::lock_guard lock(*this);
195 CloseLocked();
196 }
197
198 // Adds a SendFuture or ReserveSendFuture to the list of pending futures.
199 void add_send_future(BaseChannelFuture& future)
201 send_futures_.Push(future);
202 }
203
204 // Adds a ReceiveFuture to the list of pending futures.
205 void add_receive_future(BaseChannelFuture& future)
207 receive_futures_.Push(future);
208 }
209
210 void DropReservationAndRemoveRef() PW_LOCKS_EXCLUDED(*this);
211
212 void add_receiver() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
213 add_object(receiver_count_);
214 }
215
216 void add_sender() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
217 add_object(sender_count_);
218 }
219
220 void add_handle() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
221 add_object(handle_count_);
222 }
223
224 void add_reservation() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
225 reservations_ += 1;
226 }
227
228 void remove_reservation() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
229 PW_DASSERT(reservations_ > 0);
230 reservations_ -= 1;
231 }
232
233 void remove_sender() PW_LOCKS_EXCLUDED(*this) {
234 remove_object(&sender_count_);
235 }
236
237 void remove_receiver() PW_LOCKS_EXCLUDED(*this) {
238 remove_object(&receiver_count_);
239 }
240
241 void remove_handle() PW_LOCKS_EXCLUDED(*this) {
242 remove_object(&handle_count_);
243 }
244
245 void add_ref() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
246 PW_ASSERT(CheckedIncrement(ref_count_, 1));
247 }
248
249 protected:
250 constexpr BaseChannel() = default;
251
252 ~BaseChannel();
253
254 void WakeOneReceiver() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
255 receive_futures_.ResolveOneIfAvailable();
256 }
257
258 void WakeOneSender() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
259 send_futures_.ResolveOneIfAvailable();
260 }
261
262 uint16_t reservations() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
263 return reservations_;
264 }
265
266 private:
267 void add_object(uint8_t& counter) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
268 if (is_open_locked()) {
269 PW_ASSERT(CheckedIncrement(counter, 1));
270 }
271 add_ref();
272 }
273
274 // Takes a pointer since otherwise Clang's thread safety analysis complains
275 // about taking a reference without the lock held.
276 void remove_object(uint8_t* counter) PW_LOCKS_EXCLUDED(*this);
277
278 // Returns true if the channel should be closed following a reference
279 // decrement.
280 //
281 // Handles can create new senders and receivers, so as long as one exists, the
282 // channel should remain open. Without active handles, the channel closes when
283 // either end fully hangs up.
284 bool should_close() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
285 return handle_count_ == 0 && (sender_count_ == 0 || receiver_count_ == 0);
286 }
287
288 void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(*this);
289
290 // Destroys the channel if it is dynamically allocated.
291 virtual void Destroy() {}
292
293 BaseChannelFuture::List send_futures_ PW_GUARDED_BY(*this);
294 BaseChannelFuture::List receive_futures_ PW_GUARDED_BY(*this);
295
296 uint16_t reservations_ PW_GUARDED_BY(*this) = 0;
297 bool closed_ PW_GUARDED_BY(*this) = false;
298 mutable sync::InterruptSpinLock lock_;
299
300 // Channels are reference counted in two ways:
301 //
302 // - Senders and receivers are tracked independently. Once either reaches
303 // zero, the channel is closed, but not destroyed. No new values can be
304 // sent, but any buffered values can still be read.
305 //
306 // - Overall object reference count, including senders, receivers, futures,
307 // and channel handles. Once this reaches zero, the channel is destroyed.
308 //
309 uint8_t sender_count_ PW_GUARDED_BY(*this) = 0;
310 uint8_t receiver_count_ PW_GUARDED_BY(*this) = 0;
311 uint8_t handle_count_ PW_GUARDED_BY(*this) = 0;
312 uint16_t ref_count_ PW_GUARDED_BY(*this) = 0;
313};
314
315template <typename T>
317 using size_type = uint16_t;
318
319 public:
320 [[nodiscard]] static ChannelDeque TryAllocate(Allocator& alloc,
321 uint16_t capacity) {
322 return ChannelDeque(FixedDeque<T>::TryAllocate(alloc, capacity));
323 }
324
325 template <size_t kAlignment, size_t kCapacity>
327 : dequeue_(storage) {}
328
329 // Exposes the minimal deque api needed by a data channel.
330 [[nodiscard]] size_type capacity() const { return dequeue_.capacity(); }
331 [[nodiscard]] size_type size() const { return dequeue_.size(); }
332 [[nodiscard]] bool empty() const { return dequeue_.empty(); }
333 [[nodiscard]] auto deallocator() const { return dequeue_.deallocator(); }
334 [[nodiscard]] T& front() { return dequeue_.front(); }
335 [[nodiscard]] const T& front() const { return dequeue_.front(); }
336
337 void pop_front() { dequeue_.pop_front(); }
338
339 template <typename U>
340 void push_back(U&& value) {
341 dequeue_.push_back(std::forward<U>(value));
342 }
343
344 template <typename... Args>
345 void emplace_back(Args&&... args) {
346 dequeue_.emplace_back(std::forward<Args>(args)...);
347 }
348
349 private:
350 explicit ChannelDeque(FixedDeque<T>&& other) : dequeue_(std::move(other)) {}
351
352 FixedDeque<T> dequeue_;
353};
354
355template <>
356class ChannelDeque<void> {
357 using size_type = uint16_t;
358
359 public:
360 ~ChannelDeque() = default;
361 ChannelDeque(const ChannelDeque& other) = delete;
362 ChannelDeque& operator=(const ChannelDeque& other) = delete;
363 ChannelDeque(ChannelDeque&& other) = default;
364 ChannelDeque& operator=(ChannelDeque&& other) = default;
365
366 explicit ChannelDeque(uint16_t capacity) : capacity_(capacity) {}
367
368 [[nodiscard]] static ChannelDeque TryAllocate(Allocator& alloc,
369 uint16_t capacity) {
370 return ChannelDeque(alloc, capacity);
371 }
372
373 // Exposes a minimal deque-like api needed by a notification channel.
374 [[nodiscard]] size_type capacity() const { return capacity_; }
375 [[nodiscard]] size_type size() const { return size_; }
376 [[nodiscard]] bool empty() const { return size_ == 0; }
377 [[nodiscard]] Deallocator* deallocator() const { return deallocator_; }
378
379 void push_back() {
380 PW_ASSERT(size_ < capacity_);
381 ++size_;
382 }
383
384 void pop_front() {
385 PW_ASSERT(size_ != 0);
386 --size_;
387 }
388
389 private:
390 ChannelDeque(Allocator& alloc, uint16_t capacity)
391 : deallocator_(&alloc), capacity_(capacity) {}
392
393 Deallocator* deallocator_ = nullptr;
394 uint16_t capacity_ = 0;
395 uint16_t size_ = 0;
396};
397
398// Like BaseChannel, Channel is an internal class that is not exposed to
399// users. Its public interface is for internal consumption.
400template <typename T>
401class Channel : public BaseChannel {
402 public:
403 Sender<T> CreateSender() PW_LOCKS_EXCLUDED(*this) {
404 {
405 std::lock_guard guard(*this);
406 if (is_open_locked()) {
407 return Sender<T>(*this);
408 }
409 }
410 return Sender<T>();
411 }
412
413 Receiver<T> CreateReceiver() PW_LOCKS_EXCLUDED(*this) {
414 {
415 std::lock_guard guard(*this);
416 if (is_open_locked()) {
417 return Receiver<T>(*this);
418 }
419 }
420 return Receiver<T>();
421 }
422
423 template <typename U,
424 int&... kExplicitGuard,
425 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
426 bool> = true>
427 void PushAndWake(U&& value) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
428 deque_.push_back(std::forward<U>(value));
429 WakeOneReceiver();
430 }
431
432 template <typename U,
433 int&... kExplicitGuard,
434 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
435 std::is_constructible_v<T, U>,
436 bool> = true>
437 void PushAndWake(U&& value) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
438 PushAndWake(T(value));
439 }
440
441 void PushAndWake() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
442 deque_.push_back();
443 WakeOneReceiver();
444 }
445
446 template <typename... Args>
447 void EmplaceAndWake(Args&&... args) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
448 deque_.emplace_back(std::forward<Args>(args)...);
449 WakeOneReceiver();
450 }
451
452 auto PopAndWake() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
453 if constexpr (std::is_void_v<T>) {
454 deque_.pop_front();
455 WakeOneSender();
456 } else {
457 T value = std::move(deque_.front());
458 deque_.pop_front();
459
460 WakeOneSender();
461 return value;
462 }
463 }
464
465 bool full() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
466 return remaining_capacity_locked() == 0;
467 }
468
469 uint16_t remaining_capacity() PW_LOCKS_EXCLUDED(*this) {
470 std::lock_guard guard(*this);
471 return remaining_capacity_locked();
472 }
473
474 uint16_t remaining_capacity_locked() const
476 return deque_.capacity() - deque_.size() - reservations();
477 }
478
479 uint16_t capacity() const PW_NO_LOCK_SAFETY_ANALYSIS {
480 // SAFETY: The capacity of `deque_` cannot change.
481 return deque_.capacity();
482 }
483
484 [[nodiscard]] bool empty() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
485 return deque_.empty();
486 }
487
488 template <typename U,
489 int&... kExplicitGuard,
490 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
491 bool> = true>
492 Status TrySend(U&& value) PW_LOCKS_EXCLUDED(*this) {
493 std::lock_guard guard(*this);
494 if (!is_open_locked()) {
496 }
497 if (full()) {
498 return Status::Unavailable();
499 }
500 PushAndWake(std::forward<U>(value));
501 return OkStatus();
502 }
503
504 template <typename U,
505 int&... kExplicitGuard,
506 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
507 std::is_constructible_v<T, U>,
508 bool> = true>
509 Status TrySend(U&& value) PW_LOCKS_EXCLUDED(*this) {
510 return TrySend(T(value));
511 }
512
513 Status TrySend() PW_LOCKS_EXCLUDED(*this) {
514 static_assert(
515 std::is_void_v<T>,
516 "TrySend() with no arguments is for notification channels only");
517
518 std::lock_guard guard(*this);
519 if (!is_open_locked()) {
521 }
522 if (full()) {
523 return Status::Unavailable();
524 }
525 PushAndWake();
526 return OkStatus();
527 }
528
529 std::conditional_t<std::is_void_v<T>, Status, Result<T>> TryReceive()
530 PW_LOCKS_EXCLUDED(*this) {
531 std::lock_guard guard(*this);
532 if (deque_.empty()) {
533 return is_open_locked() ? Status::Unavailable()
535 }
536 if constexpr (std::is_void_v<T>) {
537 PopAndWake();
538 return OkStatus();
539 } else {
540 return Result(PopAndWake());
541 }
542 }
543
544 Result<SendReservation<T>> TryReserveSend() PW_LOCKS_EXCLUDED(*this) {
545 std::lock_guard guard(*this);
546 if (!is_open_locked()) {
548 }
549 if (full()) {
550 return Status::Unavailable();
551 }
552 add_reservation();
553 return SendReservation<T>(*this);
554 }
555
556 template <typename... Args>
557 void CommitReservationAndRemoveRef(Args&&... args) PW_LOCKS_EXCLUDED(*this) {
558 lock();
559 remove_reservation();
560 if (is_open_locked()) {
561 EmplaceAndWake(std::forward<Args>(args)...);
562 }
563 RemoveRefAndDestroyIfUnreferenced();
564 }
565
566 void CommitNotificationReservationAndRemoveRef() PW_LOCKS_EXCLUDED(*this) {
567 lock();
568 remove_reservation();
569 if (is_open_locked()) {
570 PushAndWake();
571 }
572 RemoveRefAndDestroyIfUnreferenced();
573 }
574
575 protected:
576 constexpr explicit Channel(ChannelDeque<T>&& deque)
577 : deque_(std::move(deque)) {}
578
579 template <size_t kAlignment, size_t kCapacity>
581 : deque_(storage) {}
582
583 ~Channel() = default;
584
585 Deallocator* deallocator() const PW_NO_LOCK_SAFETY_ANALYSIS {
586 // SAFETY: deque_.deallocator() cannot change.
587 return deque_.deallocator();
588 }
589
590 private:
591 ChannelDeque<T> deque_ PW_GUARDED_BY(*this);
592};
593
594template <typename T>
595class DynamicChannel final : public Channel<T> {
596 public:
597 static Channel<T>* Allocate(Allocator& alloc, uint16_t capacity) {
598 ChannelDeque<T> deque = ChannelDeque<T>::TryAllocate(alloc, capacity);
599 if (deque.capacity() == 0) {
600 return nullptr;
601 }
602 return alloc.New<DynamicChannel<T>>(std::move(deque));
603 }
604
605 explicit DynamicChannel(ChannelDeque<T>&& deque)
606 : Channel<T>(std::move(deque)) {}
607
608 private:
609 ~DynamicChannel() = default;
610
611 void Destroy() final PW_LOCKS_EXCLUDED(*this) {
612 Deallocator* const deallocator = this->deallocator();
613 this->~DynamicChannel();
614 deallocator->Deallocate(this);
615 }
616};
617
618template <typename T, uint16_t kCapacity>
620 public Channel<T> {
621 public:
622 BaseChannelStorage() : Channel<T>(this->storage()) {}
623
624 protected:
625 ~BaseChannelStorage() = default;
626};
627
628template <uint16_t kCapacity>
629class BaseChannelStorage<void, kCapacity> : public Channel<void> {
630 public:
632
633 protected:
634 ~BaseChannelStorage() = default;
635};
636
643 public:
644 constexpr BaseChannelHandle() : channel_(nullptr) {}
645
647
649
650 [[nodiscard]] bool is_open() const PW_LOCKS_EXCLUDED(channel_) {
651 return channel_ != nullptr && channel_->is_open();
652 }
653
656 void Close() PW_LOCKS_EXCLUDED(channel_);
657
664 void Release() PW_LOCKS_EXCLUDED(channel_);
665
666 protected:
667 explicit BaseChannelHandle(BaseChannel& channel)
669 : channel_(&channel) {
670 channel_->add_handle();
671 }
672
673 BaseChannelHandle& operator=(const BaseChannelHandle& other)
674 PW_LOCKS_EXCLUDED(channel_);
675
676 BaseChannelHandle(BaseChannelHandle&& other) noexcept
677 : channel_(std::exchange(other.channel_, nullptr)) {}
678
679 BaseChannelHandle& operator=(BaseChannelHandle&& other) noexcept
680 PW_LOCKS_EXCLUDED(channel_);
681
682 constexpr BaseChannel* channel() const PW_LOCK_RETURNED(channel_) {
683 return channel_;
684 }
685
686 private:
687 BaseChannel* channel_;
688};
689
690} // namespace internal
691
693
695template <typename T>
697 public:
698 constexpr ChannelHandle() = default;
699
700 ChannelHandle(const ChannelHandle&) = default;
701 ChannelHandle& operator=(const ChannelHandle&) = default;
702
703 ChannelHandle(ChannelHandle&&) = default;
704 ChannelHandle& operator=(ChannelHandle&&) = default;
705
706 protected:
707 explicit ChannelHandle(internal::Channel<T>& channel)
709 : internal::BaseChannelHandle(channel) {}
710
714 PW_ASSERT(channel() != nullptr);
715 return static_cast<internal::Channel<T>&>(*channel()).CreateSender();
716 }
717
721 PW_ASSERT(channel() != nullptr);
722 return static_cast<internal::Channel<T>&>(*channel()).CreateReceiver();
723 }
724};
725
727template <typename T>
728class MpmcChannelHandle final : public ChannelHandle<T> {
729 public:
730 constexpr MpmcChannelHandle() = default;
731
734
735 private:
736 explicit MpmcChannelHandle(internal::Channel<T>& channel)
737 : ChannelHandle<T>(channel) {}
738
739 template <typename U>
740 friend std::optional<MpmcChannelHandle<U>> CreateMpmcChannel(Allocator&,
741 uint16_t);
742
743 template <typename U, uint16_t kCapacity>
746};
747
749template <typename T>
750class MpscChannelHandle final : public ChannelHandle<T> {
751 public:
752 constexpr MpscChannelHandle() = default;
753
755
756 private:
757 explicit MpscChannelHandle(internal::Channel<T>& channel)
758 : ChannelHandle<T>(channel) {}
759
760 template <typename U>
761 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
762 CreateMpscChannel(Allocator&, uint16_t);
763
764 template <typename U, uint16_t kCapacity>
765 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
767};
768
770template <typename T>
771class SpmcChannelHandle final : public ChannelHandle<T> {
772 public:
773 constexpr SpmcChannelHandle() = default;
774
776
777 private:
778 explicit SpmcChannelHandle(internal::Channel<T>& channel)
779 : ChannelHandle<T>(channel) {}
780
781 template <typename U>
782 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
783 CreateSpmcChannel(Allocator&, uint16_t);
784
785 template <typename U, uint16_t kCapacity>
786 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
788};
789
791template <typename T>
792class SpscChannelHandle final : public ChannelHandle<T> {
793 public:
794 constexpr SpscChannelHandle() = default;
795
796 private:
797 explicit SpscChannelHandle(internal::Channel<T>& channel)
798 : ChannelHandle<T>(channel) {}
799
800 template <typename U>
801 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
802 CreateSpscChannel(Allocator&, uint16_t);
803
804 template <typename U, uint16_t kCapacity>
805 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
807};
808
812template <typename T>
813class MpChannelHandle final : public ChannelHandle<T> {
814 public:
815 constexpr MpChannelHandle() = default;
816
818 : ChannelHandle<T>(other) {}
819
820 MpChannelHandle& operator=(const MpmcChannelHandle<T>& other) {
822 return *this;
823 }
824
826 : ChannelHandle<T>(std::move(other)) {}
827
828 MpChannelHandle& operator=(MpmcChannelHandle<T>&& other) {
829 ChannelHandle<T>::operator=(std::move(other));
830 return *this;
831 }
832
834 : ChannelHandle<T>(other) {}
835
836 MpChannelHandle& operator=(const MpscChannelHandle<T>& other) {
838 return *this;
839 }
840
842 : ChannelHandle<T>(std::move(other)) {}
843
844 MpChannelHandle& operator=(MpscChannelHandle<T>&& other) {
845 ChannelHandle<T>::operator=(std::move(other));
846 return *this;
847 }
848
850};
851
855template <typename T>
856class McChannelHandle final : public ChannelHandle<T> {
857 public:
858 constexpr McChannelHandle() = default;
859
861 : ChannelHandle<T>(other) {}
862
863 McChannelHandle& operator=(const MpmcChannelHandle<T>& other) {
865 return *this;
866 }
867
869 : ChannelHandle<T>(std::move(other)) {}
870
871 McChannelHandle& operator=(MpmcChannelHandle<T>&& other) {
872 ChannelHandle<T>::operator=(std::move(other));
873 return *this;
874 }
875
877 : ChannelHandle<T>(other) {}
878
879 McChannelHandle& operator=(const SpmcChannelHandle<T>& other) {
881 return *this;
882 }
883
885 : ChannelHandle<T>(std::move(other)) {}
886
887 McChannelHandle& operator=(SpmcChannelHandle<T>&& other) {
888 ChannelHandle<T>::operator=(std::move(other));
889 return *this;
890 }
891
893};
894
899template <typename T, uint16_t kCapacity>
900class ChannelStorage final
901 : private internal::BaseChannelStorage<T, kCapacity> {
902 public:
903 template <typename U, uint16_t kCap>
904 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
905 CreateSpscChannel(ChannelStorage<U, kCap>& storage);
906
907 template <typename U, uint16_t kCap>
908 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
909 ChannelStorage<U, kCap>& storage);
910
911 template <typename U, uint16_t kCap>
912 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
913 ChannelStorage<U, kCap>& storage);
914
915 template <typename U, uint16_t kCap>
916 friend MpmcChannelHandle<U> CreateMpmcChannel(
917 ChannelStorage<U, kCap>& storage);
918
919 ~ChannelStorage() = default;
920
923 [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(*this) {
924 std::lock_guard lock(*this);
925 return this->active_locked();
926 }
927
928 constexpr uint16_t capacity() const { return kCapacity; }
929};
930
931template <typename T>
932class [[nodiscard]] ReceiveFuture final
933 : public internal::ChannelFuture<ReceiveFuture<T>, T, std::optional<T>> {
934 private:
936
937 public:
938 constexpr ReceiveFuture() = default;
939
941 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel())
942 : Base(std::move(other)) {}
943
944 ReceiveFuture& operator=(ReceiveFuture&& other)
945 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
946 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
947 return static_cast<ReceiveFuture&>(this->MoveAssignFrom(other));
948 }
949
950 ~ReceiveFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
951 this->RemoveFromChannel();
952 }
953
954 private:
955 friend Base;
957 friend Receiver<T>;
958 template <typename, typename>
959 friend class CallbackTask;
960
961 explicit ReceiveFuture(internal::Channel<T>* channel)
962 PW_LOCKS_EXCLUDED(*channel)
963 : Base(channel, this->kAllowClosed) {}
964
965 PollOptional<T> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
966 if (this->channel() == nullptr) {
967 PW_DASSERT(this->is_pendable());
968 return Ready<std::optional<T>>(std::nullopt);
969 }
970
971 this->channel()->lock();
972 PW_DASSERT(this->is_pendable());
973 if (this->channel()->empty()) {
974 return this->StoreWakerForReceiveIfOpen(cx)
975 ? Pending()
976 : Ready<std::optional<T>>(std::nullopt);
977 }
978
979 auto result = Ready(this->channel()->PopAndWake());
980 this->Complete();
981 return result;
982 }
983};
984
985template <>
986class [[nodiscard]] ReceiveFuture<void> final
987 : public internal::ChannelFuture<ReceiveFuture<void>, void, bool> {
988 private:
990
991 public:
992 constexpr ReceiveFuture() = default;
993
995 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel())
996 : Base(std::move(other)) {}
997
998 ReceiveFuture& operator=(ReceiveFuture&& other)
999 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
1000 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1001 return static_cast<ReceiveFuture&>(this->MoveAssignFrom(other));
1002 }
1003
1004 ~ReceiveFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1005 this->RemoveFromChannel();
1006 }
1007
1008 private:
1009 friend Base;
1011 friend Receiver<void>;
1012 template <typename, typename>
1013 friend class CallbackTask;
1014
1015 explicit ReceiveFuture(internal::Channel<void>* channel)
1016 PW_LOCKS_EXCLUDED(*channel)
1017 : Base(channel, this->kAllowClosed) {}
1018
1019 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
1020 if (this->channel() == nullptr) {
1021 PW_DASSERT(this->is_pendable());
1022 return Ready(false);
1023 }
1024
1025 this->channel()->lock();
1026 PW_DASSERT(this->is_pendable());
1027 if (this->channel()->empty()) {
1028 return this->StoreWakerForReceiveIfOpen(cx) ? Pending() : Ready(false);
1029 }
1030
1031 this->channel()->PopAndWake();
1032 this->Complete();
1033 return Ready(true);
1034 }
1035};
1036
1038template <typename T>
1040 public:
1041 constexpr Receiver() : channel_(nullptr) {}
1042
1043 Receiver(const Receiver& other) = delete;
1044 Receiver& operator=(const Receiver& other) = delete;
1045
1046 Receiver(Receiver&& other) noexcept
1047 : channel_(std::exchange(other.channel_, nullptr)) {}
1048
1049 Receiver& operator=(Receiver&& other) noexcept {
1050 if (this == &other) {
1051 return *this;
1052 }
1053 if (channel_ != nullptr) {
1054 channel_->remove_receiver();
1055 }
1056 channel_ = std::exchange(other.channel_, nullptr);
1057 return *this;
1058 }
1059
1060 ~Receiver() {
1061 if (channel_ != nullptr) {
1062 channel_->remove_receiver();
1063 }
1064 }
1065
1075 std::conditional_t<std::is_void_v<T>, Status, Result<T>> BlockingReceive(
1076 Dispatcher& dispatcher,
1079 if (channel_ == nullptr) {
1081 }
1082
1083 // Return immediately if a value is available or the channel is closed.
1084 if constexpr (std::is_void_v<T>) {
1085 if (Status result = channel_->TryReceive();
1086 result.ok() || result.IsFailedPrecondition()) {
1087 return result;
1088 }
1089 } else {
1090 if (Result<T> result = channel_->TryReceive();
1091 result.ok() || result.status().IsFailedPrecondition()) {
1092 return result;
1093 }
1094 }
1095
1096 using ReceiveType =
1097 std::conditional_t<std::is_void_v<T>, bool, std::optional<T>>;
1098
1099 ReceiveType result;
1100 sync::TimedThreadNotification notification;
1101
1103 [&result, &notification](ReceiveType&& val) {
1104 result = std::move(val);
1105 notification.release();
1106 },
1107 channel_);
1108 dispatcher.Post(task);
1109
1110 if (timeout == internal::Channel<T>::kWaitForever) {
1111 notification.acquire();
1112 if constexpr (std::is_void_v<T>) {
1113 return result ? OkStatus() : Status::FailedPrecondition();
1114 } else {
1115 if (!result.has_value()) {
1117 }
1118 return Result<T>(std::move(*result));
1119 }
1120 }
1121
1122 if (!notification.try_acquire_for(timeout)) {
1123 return Status::DeadlineExceeded();
1124 }
1125
1126 if constexpr (std::is_void_v<T>) {
1127 return result ? OkStatus() : Status::FailedPrecondition();
1128 } else {
1129 if (!result.has_value()) {
1131 }
1132 return Result<T>(std::move(*result));
1133 }
1134 }
1135
1144 return ReceiveFuture<T>(channel_);
1145 }
1146
1153 std::conditional_t<std::is_void_v<T>, Status, Result<T>> TryReceive() {
1154 if (channel_ == nullptr) {
1156 }
1157
1158 return channel_->TryReceive();
1159 }
1160
1165 void Disconnect() {
1166 if (channel_ != nullptr) {
1167 channel_->remove_receiver();
1168 channel_ = nullptr;
1169 }
1170 }
1171
1173 [[nodiscard]] bool is_open() const {
1174 return channel_ != nullptr && channel_->is_open();
1175 }
1176
1177 private:
1178 template <typename U>
1179 friend class internal::Channel;
1180
1181 template <typename U>
1182 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
1183 CreateMpscChannel(Allocator&, uint16_t);
1184
1185 template <typename U, uint16_t kCapacity>
1186 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
1188
1189 template <typename U>
1190 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
1191 CreateSpscChannel(Allocator&, uint16_t);
1192
1193 template <typename U, uint16_t kCapacity>
1194 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
1196
1197 explicit Receiver(internal::Channel<T>& channel)
1199 : channel_(&channel) {
1200 channel_->add_receiver();
1201 }
1202
1203 internal::Channel<T>* channel_;
1204};
1205
1206template <typename T>
1207class [[nodiscard]] SendFuture final
1208 : public internal::ChannelFuture<SendFuture<T>, T, bool> {
1209 private:
1211
1212 public:
1213 constexpr SendFuture() = default;
1214
1215 SendFuture(SendFuture&& other) PW_LOCKS_EXCLUDED(*other.channel())
1216 : Base(static_cast<Base&&>(other)), value_(std::move(other.value_)) {}
1217
1218 SendFuture& operator=(SendFuture&& other)
1219 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
1220 value_ = std::move(other.value_);
1221 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1222 return static_cast<SendFuture&>(this->MoveAssignFrom(other));
1223 }
1224
1225 ~SendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1226 this->RemoveFromChannel();
1227 }
1228
1229 private:
1230 friend Base;
1231 friend internal::Channel<T>;
1232 friend Sender<T>;
1233
1234 SendFuture(internal::Channel<T>* channel, const T& value)
1235 PW_LOCKS_EXCLUDED(*channel)
1236 : Base(channel), value_(value) {}
1237
1238 SendFuture(internal::Channel<T>* channel, T&& value)
1239 PW_LOCKS_EXCLUDED(*channel)
1240 : Base(channel), value_(std::move(value)) {}
1241
1242 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
1243 if (this->channel() == nullptr) {
1244 PW_DASSERT(this->is_pendable());
1245 return Ready(false);
1246 }
1247
1248 this->channel()->lock();
1249 PW_DASSERT(this->is_pendable());
1250 if (!this->channel()->is_open_locked()) {
1251 this->Complete();
1252 return Ready(false);
1253 }
1254
1255 if (this->channel()->full()) {
1256 this->StoreWakerForSend(cx);
1257 return Pending();
1258 }
1259
1260 this->channel()->PushAndWake(std::move(*value_));
1261 this->Complete();
1262 return Ready(true);
1263 }
1264
1265 std::optional<T> value_;
1266};
1267
1268template <>
1269class [[nodiscard]] SendFuture<void> final
1270 : public internal::ChannelFuture<SendFuture<void>, void, bool> {
1271 private:
1273
1274 public:
1275 constexpr SendFuture() = default;
1276
1277 SendFuture(SendFuture&& other) PW_LOCKS_EXCLUDED(*other.channel())
1278 : Base(static_cast<Base&&>(other)) {}
1279
1280 SendFuture& operator=(SendFuture&& other)
1281 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
1282 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1283 return static_cast<SendFuture&>(this->MoveAssignFrom(other));
1284 }
1285
1286 ~SendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1287 this->RemoveFromChannel();
1288 }
1289
1290 private:
1291 friend Base;
1293 friend Sender<void>;
1294
1295 explicit SendFuture(internal::Channel<void>* channel)
1296 PW_LOCKS_EXCLUDED(*channel)
1297 : Base(channel) {}
1298
1299 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
1300 if (this->channel() == nullptr) {
1301 PW_DASSERT(this->is_pendable());
1302 return Ready(false);
1303 }
1304
1305 this->channel()->lock();
1306 PW_DASSERT(this->is_pendable());
1307 if (!this->channel()->is_open_locked()) {
1308 this->Complete();
1309 return Ready(false);
1310 }
1311
1312 if (this->channel()->full()) {
1313 this->StoreWakerForSend(cx);
1314 return Pending();
1315 }
1316
1317 this->channel()->PushAndWake();
1318 this->Complete();
1319 return Ready(true);
1320 }
1321};
1322
1329template <typename T>
1331 public:
1332 SendReservation(const SendReservation& other) = delete;
1333 SendReservation& operator=(const SendReservation& other) = delete;
1334
1336 : channel_(std::exchange(other.channel_, nullptr)) {}
1337
1338 SendReservation& operator=(SendReservation&& other) {
1339 if (this == &other) {
1340 return *this;
1341 }
1342 Cancel();
1343 channel_ = std::exchange(other.channel_, nullptr);
1344 return *this;
1345 }
1346
1347 ~SendReservation() { Cancel(); }
1348
1350 template <typename... Args>
1351 void Commit(Args&&... args) {
1352 PW_ASSERT(channel_ != nullptr);
1353 channel_->CommitReservationAndRemoveRef(std::forward<Args>(args)...);
1354 channel_ = nullptr;
1355 }
1356
1357 void CommitNotification() {
1358 static_assert(std::is_void_v<T>,
1359 "CommitNotification() is for notification channels only.");
1360 PW_ASSERT(channel_ != nullptr);
1361 channel_->CommitNotificationReservationAndRemoveRef();
1362 channel_ = nullptr;
1363 }
1364
1366 void Cancel() {
1367 if (channel_ != nullptr) {
1368 channel_->DropReservationAndRemoveRef();
1369 channel_ = nullptr;
1370 }
1371 }
1372
1373 private:
1374 friend internal::Channel<T>;
1375 friend class ReserveSendFuture<T>;
1376 friend class Sender<T>;
1377
1378 explicit SendReservation(internal::Channel<T>& channel)
1380 : channel_(&channel) {
1381 channel_->add_ref();
1382 }
1383
1384 internal::Channel<T>* channel_;
1385};
1386
1387template <typename T>
1388class [[nodiscard]] ReserveSendFuture final
1389 : public internal::ChannelFuture<ReserveSendFuture<T>,
1390 T,
1391 std::optional<SendReservation<T>>> {
1392 private:
1393 using Base = internal::
1394 ChannelFuture<ReserveSendFuture, T, std::optional<SendReservation<T>>>;
1395
1396 public:
1397 constexpr ReserveSendFuture() = default;
1398
1399 ReserveSendFuture(ReserveSendFuture&& other) : Base(std::move(other)) {}
1400
1401 ReserveSendFuture& operator=(ReserveSendFuture&& other) {
1402 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1403 return static_cast<ReserveSendFuture&>(this->MoveAssignFrom(other));
1404 }
1405
1406 ~ReserveSendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1407 this->RemoveFromChannel();
1408 }
1409
1410 private:
1411 friend Base;
1412 friend internal::Channel<T>;
1413 friend Sender<T>;
1414
1415 explicit ReserveSendFuture(internal::Channel<T>* channel)
1416 PW_LOCKS_EXCLUDED(*channel)
1417 : Base(channel) {}
1418
1420 PW_LOCKS_EXCLUDED(*this->channel()) {
1421 if (this->channel() == nullptr) {
1422 PW_DASSERT(this->is_pendable());
1423 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1424 }
1425
1426 this->channel()->lock();
1427 PW_DASSERT(this->is_pendable());
1428 if (!this->channel()->is_open_locked()) {
1429 this->Complete();
1430 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1431 }
1432
1433 if (this->channel()->remaining_capacity_locked() == 0) {
1434 this->StoreWakerForReserveSend(cx);
1435 return Pending();
1436 }
1437
1438 this->channel()->add_reservation();
1439 SendReservation<T> reservation(*this->channel());
1440 this->Complete();
1441 return reservation;
1442 }
1443};
1444
1446template <typename T>
1447class Sender {
1448 public:
1449 constexpr Sender() : channel_(nullptr) {}
1450
1451 Sender(const Sender& other) = delete;
1452 Sender& operator=(const Sender& other) = delete;
1453
1454 Sender(Sender&& other) noexcept
1455 : channel_(std::exchange(other.channel_, nullptr)) {}
1456
1457 Sender& operator=(Sender&& other) noexcept {
1458 if (this == &other) {
1459 return *this;
1460 }
1461 if (channel_ != nullptr) {
1462 channel_->remove_sender();
1463 }
1464 channel_ = std::exchange(other.channel_, nullptr);
1465 return *this;
1466 }
1467
1468 ~Sender() {
1469 if (channel_ != nullptr) {
1470 channel_->remove_sender();
1471 }
1472 }
1473
1482 template <typename U>
1483 SendFuture<T> Send(U&& value) {
1484 return SendFuture<T>(channel_, std::forward<U>(value));
1485 }
1486
1487 SendFuture<T> Send() {
1488 static_assert(std::is_void_v<T>,
1489 "Send() with no arguments is for notification channels only");
1490 return SendFuture<T>(channel_);
1491 }
1492
1499
1510 if (channel_ == nullptr) {
1512 }
1513 return channel_->TryReserveSend();
1514 }
1515
1525 template <typename U,
1526 int&... kExplicitGuard,
1527 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
1528 bool> = true>
1529 Status TrySend(U&& value) {
1530 if (channel_ == nullptr) {
1532 }
1533 return channel_->TrySend(std::forward<U>(value));
1534 }
1535
1537 template <typename U,
1538 int&... kExplicitGuard,
1539 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
1540 std::is_constructible_v<T, U>,
1541 bool> = true>
1542 Status TrySend(U&& value) {
1543 return TrySend(T(value));
1544 }
1545
1557 static_assert(
1558 std::is_void_v<T>,
1559 "TrySend() with no arguments is for notification channels only");
1560 if (channel_ == nullptr) {
1562 }
1563 return channel_->TrySend();
1564 }
1565
1576 template <typename U,
1577 int&... kExplicitGuard,
1578 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
1579 bool> = true>
1581 U&& value,
1584 return BlockingSendMoveOrCopy(dispatcher, std::forward<U>(value), timeout);
1585 }
1586
1588 template <typename U,
1589 int&... kExplicitGuard,
1590 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
1591 std::is_constructible_v<T, U>,
1592 bool> = true>
1594 U&& value,
1597 return BlockingSendMoveOrCopy(dispatcher, T(value), timeout);
1598 }
1599
1613 return BlockingSendMoveOrCopy(dispatcher, timeout);
1614 }
1615
1620 void Disconnect() {
1621 if (channel_ != nullptr) {
1622 channel_->remove_sender();
1623 channel_ = nullptr;
1624 }
1625 }
1626
1628 uint16_t remaining_capacity() const {
1629 return channel_ != nullptr ? channel_->remaining_capacity() : 0;
1630 }
1631
1633 uint16_t capacity() const {
1634 return channel_ != nullptr ? channel_->capacity() : 0;
1635 }
1636
1638 [[nodiscard]] bool is_open() const {
1639 return channel_ != nullptr && channel_->is_open();
1640 }
1641
1642 private:
1643 template <typename U>
1644 friend class internal::Channel;
1645
1646 template <typename U>
1647 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
1648 CreateSpmcChannel(Allocator&, uint16_t);
1649
1650 template <typename U, uint16_t kCapacity>
1651 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
1653
1654 template <typename U>
1655 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
1656 CreateSpscChannel(Allocator&, uint16_t);
1657
1658 template <typename U, uint16_t kCapacity>
1659 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
1661
1662 explicit Sender(internal::Channel<T>& channel)
1664 : channel_(&channel) {
1665 channel_->add_sender();
1666 }
1667
1668 template <typename U>
1669 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1670 U&& value,
1672 PW_LOCKS_EXCLUDED(*channel_) {
1673 if (channel_ == nullptr) {
1675 }
1676
1677 if (Status status = channel_->TrySend(std::forward<U>(value));
1678 status.ok() || status.IsFailedPrecondition()) {
1679 return status;
1680 }
1681
1682 return BlockingSendFuture(
1683 dispatcher, // NOLINTNEXTLINE(bugprone-use-after-move)
1684 SendFuture<T>(channel_, std::forward<U>(value)),
1685 timeout);
1686 }
1687
1688 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1690 PW_LOCKS_EXCLUDED(*channel_) {
1691 if (channel_ == nullptr) {
1693 }
1694
1695 if (Status status = channel_->TrySend();
1696 status.ok() || status.IsFailedPrecondition()) {
1697 return status;
1698 }
1699
1700 return BlockingSendFuture(dispatcher, SendFuture<T>(channel_), timeout);
1701 }
1702
1703 Status BlockingSendFuture(Dispatcher& dispatcher,
1704 SendFuture<T>&& future,
1706 PW_LOCKS_EXCLUDED(*channel_) {
1707 Status status;
1708 sync::TimedThreadNotification notification;
1709
1710 CallbackTask task(
1711 [&status, &notification](bool result) {
1712 status = result ? OkStatus() : Status::FailedPrecondition();
1713 notification.release();
1714 },
1715 std::move(future));
1716 dispatcher.Post(task);
1717
1718 if (timeout == internal::Channel<T>::kWaitForever) {
1719 notification.acquire();
1720 return status;
1721 }
1722
1723 if (!notification.try_acquire_for(timeout)) {
1724 task.Deregister();
1725 return Status::DeadlineExceeded();
1726 }
1727 return status;
1728 }
1729
1730 internal::Channel<T>* channel_;
1731};
1732
1746template <typename T>
1747std::optional<MpmcChannelHandle<T>> CreateMpmcChannel(Allocator& alloc,
1748 uint16_t capacity) {
1749 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1750 if (channel == nullptr) {
1751 return std::nullopt;
1752 }
1753 std::lock_guard lock(*channel);
1754 return MpmcChannelHandle<T>(*channel);
1755}
1756
1768template <typename T, uint16_t kCapacity>
1770 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1771 PW_DASSERT(!storage.active_locked());
1772 return MpmcChannelHandle<T>(storage);
1773}
1774
1788template <typename T>
1789std::optional<std::tuple<MpscChannelHandle<T>, Receiver<T>>> CreateMpscChannel(
1790 Allocator& alloc, uint16_t capacity) {
1791 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1792 if (channel == nullptr) {
1793 return std::nullopt;
1794 }
1795 std::lock_guard lock(*channel);
1796 return std::make_tuple(MpscChannelHandle<T>(*channel), Receiver<T>(*channel));
1797}
1798
1810template <typename T, uint16_t kCapacity>
1811std::tuple<MpscChannelHandle<T>, Receiver<T>> CreateMpscChannel(
1813 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1814 PW_DASSERT(!storage.active_locked());
1815 return std::make_tuple(MpscChannelHandle<T>(storage), Receiver<T>(storage));
1816}
1817
1831template <typename T>
1832std::optional<std::tuple<SpmcChannelHandle<T>, Sender<T>>> CreateSpmcChannel(
1833 Allocator& alloc, uint16_t capacity) {
1834 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1835 if (channel == nullptr) {
1836 return std::nullopt;
1837 }
1838 std::lock_guard lock(*channel);
1839 return std::make_tuple(SpmcChannelHandle<T>(*channel), Sender<T>(*channel));
1840}
1841
1853template <typename T, uint16_t kCapacity>
1854std::tuple<SpmcChannelHandle<T>, Sender<T>> CreateSpmcChannel(
1856 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1857 PW_DASSERT(!storage.active_locked());
1858 return std::make_tuple(SpmcChannelHandle<T>(storage), Sender<T>(storage));
1859}
1860
1874template <typename T>
1875std::optional<std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>>>
1876CreateSpscChannel(Allocator& alloc, uint16_t capacity) {
1877 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1878 if (channel == nullptr) {
1879 return std::nullopt;
1880 }
1881 std::lock_guard lock(*channel);
1882 return std::make_tuple(SpscChannelHandle<T>(*channel),
1883 Sender<T>(*channel),
1884 Receiver<T>(*channel));
1885}
1886
1898template <typename T, uint16_t kCapacity>
1899std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>> CreateSpscChannel(
1901 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1902 PW_DASSERT(!storage.active_locked());
1903 return std::make_tuple(
1904 SpscChannelHandle<T>(storage), Sender<T>(storage), Receiver<T>(storage));
1905}
1906
1908
1909namespace internal {
1910
1911inline void BaseChannelFuture::Complete() PW_UNLOCK_FUNCTION(*channel_) {
1912 channel_->RemoveRefAndDestroyIfUnreferenced();
1913 channel_ = nullptr;
1914}
1915
1916} // namespace internal
1917} // namespace pw::async2
Definition: allocator.h:42
T * New(Args &&... args)
Definition: allocator.h:66
Abstract interface for releasing memory.
Definition: deallocator.h:30
Definition: deque.h:202
Definition: result.h:145
Definition: status.h:120
static constexpr Status DeadlineExceeded()
Definition: status.h:177
static constexpr Status Unavailable()
Definition: status.h:304
static constexpr Status FailedPrecondition()
Definition: status.h:243
Definition: callback_task.h:44
Channel handle for a particular type T.
Definition: channel.h:696
Sender< T > CreateSender()
Definition: channel.h:713
Receiver< T > CreateReceiver()
Definition: channel.h:720
Definition: channel.h:901
bool active() const
Definition: channel.h:923
Definition: task.h:45
Definition: future.h:408
Definition: dispatcher.h:74
constexpr bool is_pendable() const
Definition: future.h:255
void MarkComplete()
Definition: future.h:306
constexpr bool is_complete() const
Definition: future.h:260
Definition: future.h:114
Definition: channel.h:856
Definition: channel.h:813
A handle to a multi-producer, multi-consumer channel.
Definition: channel.h:728
friend std::optional< MpmcChannelHandle< U > > CreateMpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1747
A handle to a multi-producer, single-consumer channel.
Definition: channel.h:750
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1789
Definition: poll.h:138
Definition: channel.h:933
A receiver which reads values from an asynchronous channel.
Definition: channel.h:1039
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1789
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1173
std::conditional_t< std::is_void_v< T >, Status, Result< T > > TryReceive()
Definition: channel.h:1153
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1876
void Disconnect()
Definition: channel.h:1165
std::conditional_t< std::is_void_v< T >, Status, Result< T > > BlockingReceive(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1075
ReceiveFuture< T > Receive()
Definition: channel.h:1143
Definition: channel.h:1391
Definition: channel.h:1208
Definition: channel.h:1330
void Cancel()
Releases the reservation, making the space available for other senders.
Definition: channel.h:1366
void Commit(Args &&... args)
Commits a value to a reserved slot.
Definition: channel.h:1351
A sender which writes values to an asynchronous channel.
Definition: channel.h:1447
SendFuture< T > Send(U &&value)
Definition: channel.h:1483
Status TrySend(U &&value)
Definition: channel.h:1529
Result< SendReservation< T > > TryReserveSend()
Definition: channel.h:1509
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1832
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1876
ReserveSendFuture< T > ReserveSend()
Definition: channel.h:1498
uint16_t capacity() const
Returns the maximum capacity of the channel.
Definition: channel.h:1633
Status BlockingSend(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1610
Status BlockingSend(Dispatcher &dispatcher, U &&value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1580
uint16_t remaining_capacity() const
Returns the remaining capacity of the channel.
Definition: channel.h:1628
void Disconnect()
Definition: channel.h:1620
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1638
Status TrySend()
Definition: channel.h:1556
A handle to a single-producer, multi-consumer channel.
Definition: channel.h:771
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1832
A handle to a single-producer, single-consumer channel.
Definition: channel.h:792
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1876
bool is_complete() const
True if the future has returned Ready().
Definition: channel.h:75
bool is_pendable() const
True if the Pend can be called on the future.
Definition: channel.h:72
Definition: channel.h:166
Definition: channel.h:316
Definition: channel.h:131
Definition: channel.h:401
Definition: channel.h:595
Definition: storage.h:86
Definition: storage.h:39
Definition: interrupt_spin_lock.h:50
Definition: timed_thread_notification.h:40
bool try_acquire_for(chrono::SystemClock::duration timeout)
void Deallocate(void *ptr)
Definition: deallocator.h:314
std::optional< std::tuple< SpmcChannelHandle< T >, Sender< T > > > CreateSpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1832
std::optional< std::tuple< MpscChannelHandle< T >, Receiver< T > > > CreateMpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1789
std::optional< MpmcChannelHandle< T > > CreateMpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1747
std::optional< std::tuple< SpscChannelHandle< T >, Sender< T >, Receiver< T > > > CreateSpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1876
std::conditional_t< std::is_void_v< typename T::value_type >, ReadyType, typename T::value_type > FutureValue
Definition: future.h:110
constexpr bool IsReady() const noexcept
Returns whether or not this value is Ready.
Definition: poll.h:211
constexpr PendingType Pending()
Returns a value indicating that an operation was not yet able to complete.
Definition: poll.h:353
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:337
std::chrono::duration< rep, period > duration
Alias for durations representable with this clock.
Definition: system_clock.h:91
constexpr bool CheckedIncrement(T &base, Inc inc)
Definition: checked_arithmetic.h:118
constexpr Status OkStatus()
Definition: status.h:450
#define PW_LOCKABLE(name)
Definition: lock_annotations.h:210
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_NO_LOCK_SAFETY_ANALYSIS
Definition: lock_annotations.h:296
#define PW_EXCLUSIVE_LOCK_FUNCTION(...)
Definition: lock_annotations.h:232
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:147
#define PW_LOCK_RETURNED(x)
Definition: lock_annotations.h:199
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:249
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:178