C/C++ API Reference
Loading...
Searching...
No Matches
channel.h
1// Copyright 2025 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include <mutex>
17#include <optional>
18
19#include "lib/stdcompat/type_traits.h"
20#include "pw_allocator/allocator.h"
21#include "pw_async2/callback_task.h"
22#include "pw_async2/dispatcher.h"
23#include "pw_async2/future.h"
24#include "pw_containers/deque.h"
25#include "pw_numeric/checked_arithmetic.h"
26#include "pw_result/result.h"
27#include "pw_sync/interrupt_spin_lock.h"
28#include "pw_sync/lock_annotations.h"
29#include "pw_sync/timed_thread_notification.h"
30
31namespace pw::async2 {
32
33template <typename T>
34class Receiver;
35
36template <typename T>
37class ReceiveFuture;
38
39template <typename T>
40class Sender;
41
42template <typename T>
43class SendFuture;
44
45template <typename T>
46class ReserveSendFuture;
47
48template <typename T>
49class SendReservation;
50
51template <typename T, uint16_t kCapacity>
52class ChannelStorage;
53
54namespace internal {
55
56template <typename T>
57class Channel;
58
59class BaseChannel;
60
62 public:
63 constexpr BaseChannelFuture() : channel_(nullptr) {}
64
65 BaseChannelFuture(const BaseChannelFuture&) = delete;
66 BaseChannelFuture& operator=(const BaseChannelFuture&) = delete;
67
68 // Derived classes call MoveAssignFrom to move rather than use the operator.
69 BaseChannelFuture& operator=(BaseChannelFuture&&) = delete;
70
72 [[nodiscard]] bool is_pendable() const { return core_.is_pendable(); }
73
75 [[nodiscard]] bool is_complete() const { return core_.is_complete(); }
76
77 protected:
78 // Creates a new future, storing nullptr if `channel` is nullptr or if the
79 // channel is closed.
80 explicit BaseChannelFuture(BaseChannel* channel) PW_LOCKS_EXCLUDED(*channel);
81
82 enum AllowClosed { kAllowClosed };
83
84 // Creates a new future. Always increases the ref count, even if the channel
85 // is closed. This allows ReceiveFutures to read values from closed channels.
86 BaseChannelFuture(BaseChannel* channel, AllowClosed)
87 PW_LOCKS_EXCLUDED(*channel)
88 : core_(FutureState::kPending) {
89 StoreAndAddRefIfNonnull(channel);
90 }
91
92 BaseChannelFuture(BaseChannelFuture&& other)
93 PW_LOCKS_EXCLUDED(*channel_, *other.channel_)
94 : channel_(other.channel_) {
95 MoveFrom(other);
96 }
97
98 BaseChannelFuture& MoveAssignFrom(BaseChannelFuture& other)
99 PW_LOCKS_EXCLUDED(*channel_, *other.channel_);
100
101 // Unlists this future and removes a reference from the channel.
102 void RemoveFromChannel() PW_LOCKS_EXCLUDED(*channel_);
103
104 bool StoreWakerForReceiveIfOpen(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
105
106 void StoreWakerForSend(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
107
108 void StoreWakerForReserveSend(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
109
110 void MarkCompleted() { core_.MarkComplete(); }
111
112 void Complete() PW_UNLOCK_FUNCTION(*channel_);
113
114 BaseChannel* base_channel() PW_LOCK_RETURNED(channel_) { return channel_; }
115
116 private:
117 void StoreAndAddRefIfNonnull(BaseChannel* channel)
118 PW_LOCKS_EXCLUDED(*channel);
119
120 void MoveFrom(BaseChannelFuture& other) PW_LOCKS_EXCLUDED(*other.channel_);
121
122 BaseChannel* channel_;
123 FutureCore core_;
124
125 public:
126 using List = FutureList<&BaseChannelFuture::core_>;
127};
128
129// Adds a Pend function to BaseChannelFuture.
130template <typename Derived, typename T, typename FutureValue>
132 public:
133 using value_type = FutureValue;
134
135 Poll<value_type> Pend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
136 Poll<value_type> result = static_cast<Derived&>(*this).DoPend(cx);
137 if (result.IsReady()) {
138 // If Ready(), the future is no longer associated with the channel, so it
139 // can be marked complete without the lock held.
140 MarkCompleted();
141 }
142 return result;
143 }
144
145 protected:
146 constexpr ChannelFuture() = default;
147
148 explicit ChannelFuture(Channel<T>* channel) : BaseChannelFuture(channel) {}
149
150 ChannelFuture(Channel<T>* channel, AllowClosed)
151 : BaseChannelFuture(channel, kAllowClosed) {}
152
153 ChannelFuture(ChannelFuture&& other) : BaseChannelFuture(std::move(other)) {}
154
155 Channel<T>* channel() PW_LOCK_RETURNED(this->base_channel()) {
156 return static_cast<Channel<T>*>(base_channel());
157 }
158
159 private:
160 using BaseChannelFuture::base_channel;
161 using BaseChannelFuture::MarkCompleted;
162};
163
164// Internal generic channel type. BaseChannel is not exposed to users. Its
165// public interface is for internal consumption.
166class PW_LOCKABLE("pw::async2::internal::BaseChannel") BaseChannel {
167 public:
168 static constexpr chrono::SystemClock::duration kWaitForever =
169 chrono::SystemClock::duration::max();
170
171 // Acquires the channel's lock.
172 void lock() PW_EXCLUSIVE_LOCK_FUNCTION() { lock_.lock(); }
173
174 // Releases the channel's lock.
175 void unlock() PW_UNLOCK_FUNCTION() { lock_.unlock(); }
176
177 [[nodiscard]] bool is_open() PW_LOCKS_EXCLUDED(*this) {
178 std::lock_guard lock(*this);
179 return is_open_locked();
180 }
181
182 bool is_open_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
183 return !closed_;
184 }
185
186 [[nodiscard]] bool active_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
187 return ref_count_ != 0;
188 }
189
190 // Removes a reference to this channel and destroys the channel if needed.
191 void RemoveRefAndDestroyIfUnreferenced() PW_UNLOCK_FUNCTION();
192
193 void Close() PW_LOCKS_EXCLUDED(*this) {
194 std::lock_guard lock(*this);
195 CloseLocked();
196 }
197
198 // Adds a SendFuture or ReserveSendFuture to the list of pending futures.
199 void add_send_future(BaseChannelFuture& future)
201 send_futures_.Push(future);
202 }
203
204 // Adds a ReceiveFuture to the list of pending futures.
205 void add_receive_future(BaseChannelFuture& future)
207 receive_futures_.Push(future);
208 }
209
210 void DropReservationAndRemoveRef() PW_LOCKS_EXCLUDED(*this);
211
212 void add_receiver() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
213 add_object(receiver_count_);
214 }
215
216 void add_sender() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
217 add_object(sender_count_);
218 }
219
220 void add_handle() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
221 add_object(handle_count_);
222 }
223
224 void add_reservation() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
225 reservations_ += 1;
226 }
227
228 void remove_reservation() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
229 PW_DASSERT(reservations_ > 0);
230 reservations_ -= 1;
231 }
232
233 void remove_sender() PW_LOCKS_EXCLUDED(*this) {
234 remove_object(&sender_count_);
235 }
236
237 void remove_receiver() PW_LOCKS_EXCLUDED(*this) {
238 remove_object(&receiver_count_);
239 }
240
241 void remove_handle() PW_LOCKS_EXCLUDED(*this) {
242 remove_object(&handle_count_);
243 }
244
245 void add_ref() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
246 PW_ASSERT(CheckedIncrement(ref_count_, 1));
247 }
248
249 protected:
250 constexpr BaseChannel() = default;
251
252 ~BaseChannel();
253
254 void WakeOneReceiver() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
255 receive_futures_.ResolveOneIfAvailable();
256 }
257
258 void WakeOneSender() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
259 send_futures_.ResolveOneIfAvailable();
260 }
261
262 uint16_t reservations() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
263 return reservations_;
264 }
265
266 private:
267 void add_object(uint8_t& counter) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
268 if (is_open_locked()) {
269 PW_ASSERT(CheckedIncrement(counter, 1));
270 }
271 add_ref();
272 }
273
274 // Takes a pointer since otherwise Clang's thread safety analysis complains
275 // about taking a reference without the lock held.
276 void remove_object(uint8_t* counter) PW_LOCKS_EXCLUDED(*this);
277
278 // Returns true if the channel should be closed following a reference
279 // decrement.
280 //
281 // Handles can create new senders and receivers, so as long as one exists, the
282 // channel should remain open. Without active handles, the channel closes when
283 // either end fully hangs up.
284 bool should_close() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
285 return handle_count_ == 0 && (sender_count_ == 0 || receiver_count_ == 0);
286 }
287
288 void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(*this);
289
290 // Destroys the channel if it is dynamically allocated.
291 virtual void Destroy() {}
292
293 BaseChannelFuture::List send_futures_ PW_GUARDED_BY(*this);
294 BaseChannelFuture::List receive_futures_ PW_GUARDED_BY(*this);
295
296 uint16_t reservations_ PW_GUARDED_BY(*this) = 0;
297 bool closed_ PW_GUARDED_BY(*this) = false;
298 mutable sync::InterruptSpinLock lock_;
299
300 // Channels are reference counted in two ways:
301 //
302 // - Senders and receivers are tracked independently. Once either reaches
303 // zero, the channel is closed, but not destroyed. No new values can be
304 // sent, but any buffered values can still be read.
305 //
306 // - Overall object reference count, including senders, receivers, futures,
307 // and channel handles. Once this reaches zero, the channel is destroyed.
308 //
309 uint8_t sender_count_ PW_GUARDED_BY(*this) = 0;
310 uint8_t receiver_count_ PW_GUARDED_BY(*this) = 0;
311 uint8_t handle_count_ PW_GUARDED_BY(*this) = 0;
312 uint16_t ref_count_ PW_GUARDED_BY(*this) = 0;
313};
314
315template <typename T>
317 using size_type = uint16_t;
318
319 public:
320 [[nodiscard]] static ChannelDeque TryAllocate(Allocator& alloc,
321 uint16_t capacity) {
322 return ChannelDeque(FixedDeque<T>::TryAllocate(alloc, capacity));
323 }
324
325 template <size_t kAlignment, size_t kCapacity>
327 : dequeue_(storage) {}
328
329 // Exposes the minimal deque api needed by a data channel.
330 [[nodiscard]] size_type capacity() const { return dequeue_.capacity(); }
331 [[nodiscard]] size_type size() const { return dequeue_.size(); }
332 [[nodiscard]] bool empty() const { return dequeue_.empty(); }
333 [[nodiscard]] auto deallocator() const { return dequeue_.deallocator(); }
334 [[nodiscard]] T& front() { return dequeue_.front(); }
335 [[nodiscard]] const T& front() const { return dequeue_.front(); }
336
337 void pop_front() { dequeue_.pop_front(); }
338
339 template <typename U>
340 void push_back(U&& value) {
341 dequeue_.push_back(std::forward<U>(value));
342 }
343
344 template <typename... Args>
345 void emplace_back(Args&&... args) {
346 dequeue_.emplace_back(std::forward<Args>(args)...);
347 }
348
349 private:
350 explicit ChannelDeque(FixedDeque<T>&& other) : dequeue_(std::move(other)) {}
351
352 FixedDeque<T> dequeue_;
353};
354
355template <>
356class ChannelDeque<void> {
357 using size_type = uint16_t;
358
359 public:
360 ~ChannelDeque() = default;
361 ChannelDeque(const ChannelDeque& other) = delete;
362 ChannelDeque& operator=(const ChannelDeque& other) = delete;
363 ChannelDeque(ChannelDeque&& other) = default;
364 ChannelDeque& operator=(ChannelDeque&& other) = default;
365
366 explicit ChannelDeque(uint16_t capacity) : capacity_(capacity) {}
367
368 [[nodiscard]] static ChannelDeque TryAllocate(Allocator& alloc,
369 uint16_t capacity) {
370 return ChannelDeque(alloc, capacity);
371 }
372
373 // Exposes a minimal deque-like api needed by a notification channel.
374 [[nodiscard]] size_type capacity() const { return capacity_; }
375 [[nodiscard]] size_type size() const { return size_; }
376 [[nodiscard]] bool empty() const { return size_ == 0; }
377 [[nodiscard]] Deallocator* deallocator() const { return deallocator_; }
378
379 void push_back() {
380 PW_ASSERT(size_ < capacity_);
381 ++size_;
382 }
383
384 void pop_front() {
385 PW_ASSERT(size_ != 0);
386 --size_;
387 }
388
389 private:
390 ChannelDeque(Allocator& alloc, uint16_t capacity)
391 : deallocator_(&alloc), capacity_(capacity) {}
392
393 Deallocator* deallocator_ = nullptr;
394 uint16_t capacity_ = 0;
395 uint16_t size_ = 0;
396};
397
398// Like BaseChannel, Channel is an internal class that is not exposed to
399// users. Its public interface is for internal consumption.
400template <typename T>
401class Channel : public BaseChannel {
402 public:
403 Sender<T> CreateSender() PW_LOCKS_EXCLUDED(*this) {
404 {
405 std::lock_guard guard(*this);
406 if (is_open_locked()) {
407 return Sender<T>(*this);
408 }
409 }
410 return Sender<T>();
411 }
412
413 Receiver<T> CreateReceiver() PW_LOCKS_EXCLUDED(*this) {
414 {
415 std::lock_guard guard(*this);
416 if (is_open_locked()) {
417 return Receiver<T>(*this);
418 }
419 }
420 return Receiver<T>();
421 }
422
423 template <typename U,
424 int&... kExplicitGuard,
425 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
426 bool> = true>
427 void PushAndWake(U&& value) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
428 deque_.push_back(std::forward<U>(value));
429 WakeOneReceiver();
430 }
431
432 template <typename U,
433 int&... kExplicitGuard,
434 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
435 std::is_constructible_v<T, U>,
436 bool> = true>
437 void PushAndWake(U&& value) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
438 PushAndWake(T(value));
439 }
440
441 void PushAndWake() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
442 deque_.push_back();
443 WakeOneReceiver();
444 }
445
446 template <typename... Args>
447 void EmplaceAndWake(Args&&... args) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
448 deque_.emplace_back(std::forward<Args>(args)...);
449 WakeOneReceiver();
450 }
451
452 auto PopAndWake() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
453 if constexpr (std::is_void_v<T>) {
454 deque_.pop_front();
455 WakeOneSender();
456 } else {
457 T value = std::move(deque_.front());
458 deque_.pop_front();
459
460 WakeOneSender();
461 return value;
462 }
463 }
464
465 bool full() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
466 return remaining_capacity_locked() == 0;
467 }
468
469 uint16_t remaining_capacity() PW_LOCKS_EXCLUDED(*this) {
470 std::lock_guard guard(*this);
471 return remaining_capacity_locked();
472 }
473
474 uint16_t remaining_capacity_locked() const
476 return deque_.capacity() - deque_.size() - reservations();
477 }
478
479 uint16_t capacity() const PW_NO_LOCK_SAFETY_ANALYSIS {
480 // SAFETY: The capacity of `deque_` cannot change.
481 return deque_.capacity();
482 }
483
484 [[nodiscard]] bool empty() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
485 return deque_.empty();
486 }
487
488 template <typename U,
489 int&... kExplicitGuard,
490 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
491 bool> = true>
492 Status TrySend(U&& value) PW_LOCKS_EXCLUDED(*this) {
493 std::lock_guard guard(*this);
494 if (!is_open_locked()) {
496 }
497 if (full()) {
498 return Status::Unavailable();
499 }
500 PushAndWake(std::forward<U>(value));
501 return OkStatus();
502 }
503
504 template <typename U,
505 int&... kExplicitGuard,
506 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
507 std::is_constructible_v<T, U>,
508 bool> = true>
509 Status TrySend(U&& value) PW_LOCKS_EXCLUDED(*this) {
510 return TrySend(T(value));
511 }
512
513 Status TrySend() PW_LOCKS_EXCLUDED(*this) {
514 static_assert(
515 std::is_void_v<T>,
516 "TrySend() with no arguments is for notification channels only");
517
518 std::lock_guard guard(*this);
519 if (!is_open_locked()) {
521 }
522 if (full()) {
523 return Status::Unavailable();
524 }
525 PushAndWake();
526 return OkStatus();
527 }
528
529 std::conditional_t<std::is_void_v<T>, Status, Result<T>> TryReceive()
530 PW_LOCKS_EXCLUDED(*this) {
531 std::lock_guard guard(*this);
532 if (deque_.empty()) {
533 return is_open_locked() ? Status::Unavailable()
535 }
536 if constexpr (std::is_void_v<T>) {
537 PopAndWake();
538 return OkStatus();
539 } else {
540 return Result(PopAndWake());
541 }
542 }
543
544 Result<SendReservation<T>> TryReserveSend() PW_LOCKS_EXCLUDED(*this) {
545 std::lock_guard guard(*this);
546 if (!is_open_locked()) {
548 }
549 if (full()) {
550 return Status::Unavailable();
551 }
552 add_reservation();
553 return SendReservation<T>(*this);
554 }
555
556 template <typename... Args>
557 void CommitReservationAndRemoveRef(Args&&... args) PW_LOCKS_EXCLUDED(*this) {
558 lock();
559 remove_reservation();
560 if (is_open_locked()) {
561 EmplaceAndWake(std::forward<Args>(args)...);
562 }
563 RemoveRefAndDestroyIfUnreferenced();
564 }
565
566 void CommitNotificationReservationAndRemoveRef() PW_LOCKS_EXCLUDED(*this) {
567 lock();
568 remove_reservation();
569 if (is_open_locked()) {
570 PushAndWake();
571 }
572 RemoveRefAndDestroyIfUnreferenced();
573 }
574
575 protected:
576 constexpr explicit Channel(ChannelDeque<T>&& deque)
577 : deque_(std::move(deque)) {}
578
579 template <size_t kAlignment, size_t kCapacity>
581 : deque_(storage) {}
582
583 ~Channel() = default;
584
585 Deallocator* deallocator() const PW_NO_LOCK_SAFETY_ANALYSIS {
586 // SAFETY: deque_.deallocator() cannot change.
587 return deque_.deallocator();
588 }
589
590 private:
591 ChannelDeque<T> deque_ PW_GUARDED_BY(*this);
592};
593
594template <typename T>
595class DynamicChannel final : public Channel<T> {
596 public:
597 static Channel<T>* Allocate(Allocator& alloc, uint16_t capacity) {
598 ChannelDeque<T> deque = ChannelDeque<T>::TryAllocate(alloc, capacity);
599 if (deque.capacity() == 0) {
600 return nullptr;
601 }
602 return alloc.New<DynamicChannel<T>>(std::move(deque));
603 }
604
605 explicit DynamicChannel(ChannelDeque<T>&& deque)
606 : Channel<T>(std::move(deque)) {}
607
608 private:
609 ~DynamicChannel() = default;
610
611 void Destroy() final PW_LOCKS_EXCLUDED(*this) {
612 Deallocator* const deallocator = this->deallocator();
613 this->~DynamicChannel();
614 deallocator->Deallocate(this);
615 }
616};
617
618template <typename T, uint16_t kCapacity>
620 public Channel<T> {
621 public:
622 BaseChannelStorage() : Channel<T>(this->storage()) {}
623
624 protected:
625 ~BaseChannelStorage() = default;
626};
627
628template <uint16_t kCapacity>
629class BaseChannelStorage<void, kCapacity> : public Channel<void> {
630 public:
632
633 protected:
634 ~BaseChannelStorage() = default;
635};
636
643 public:
644 constexpr BaseChannelHandle() : channel_(nullptr) {}
645
647
649
650 [[nodiscard]] bool is_open() const PW_LOCKS_EXCLUDED(channel_) {
651 return channel_ != nullptr && channel_->is_open();
652 }
653
656 void Close() PW_LOCKS_EXCLUDED(channel_);
657
664 void Release() PW_LOCKS_EXCLUDED(channel_);
665
666 protected:
667 explicit BaseChannelHandle(BaseChannel& channel)
669 : channel_(&channel) {
670 channel_->add_handle();
671 }
672
673 BaseChannelHandle& operator=(const BaseChannelHandle& other)
674 PW_LOCKS_EXCLUDED(channel_);
675
676 BaseChannelHandle(BaseChannelHandle&& other) noexcept
677 : channel_(std::exchange(other.channel_, nullptr)) {}
678
679 BaseChannelHandle& operator=(BaseChannelHandle&& other) noexcept
680 PW_LOCKS_EXCLUDED(channel_);
681
682 constexpr BaseChannel* channel() const PW_LOCK_RETURNED(channel_) {
683 return channel_;
684 }
685
686 private:
687 BaseChannel* channel_;
688};
689
690} // namespace internal
691
693
695template <typename T>
697 public:
698 constexpr ChannelHandle() = default;
699
700 ChannelHandle(const ChannelHandle&) = default;
701 ChannelHandle& operator=(const ChannelHandle&) = default;
702
703 ChannelHandle(ChannelHandle&&) = default;
704 ChannelHandle& operator=(ChannelHandle&&) = default;
705
706 protected:
707 explicit ChannelHandle(internal::Channel<T>& channel)
709 : internal::BaseChannelHandle(channel) {}
710
714 PW_ASSERT(channel() != nullptr);
715 return static_cast<internal::Channel<T>&>(*channel()).CreateSender();
716 }
717
721 PW_ASSERT(channel() != nullptr);
722 return static_cast<internal::Channel<T>&>(*channel()).CreateReceiver();
723 }
724};
725
727template <typename T>
728class MpmcChannelHandle final : public ChannelHandle<T> {
729 public:
730 constexpr MpmcChannelHandle() = default;
731
734
735 private:
736 explicit MpmcChannelHandle(internal::Channel<T>& channel)
737 : ChannelHandle<T>(channel) {}
738
739 template <typename U>
740 friend std::optional<MpmcChannelHandle<U>> CreateMpmcChannel(Allocator&,
741 uint16_t);
742
743 template <typename U, uint16_t kCapacity>
746};
747
749template <typename T>
750class MpscChannelHandle final : public ChannelHandle<T> {
751 public:
752 constexpr MpscChannelHandle() = default;
753
755
756 private:
757 explicit MpscChannelHandle(internal::Channel<T>& channel)
758 : ChannelHandle<T>(channel) {}
759
760 template <typename U>
761 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
762 CreateMpscChannel(Allocator&, uint16_t);
763
764 template <typename U, uint16_t kCapacity>
765 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
767};
768
770template <typename T>
771class SpmcChannelHandle final : public ChannelHandle<T> {
772 public:
773 constexpr SpmcChannelHandle() = default;
774
776
777 private:
778 explicit SpmcChannelHandle(internal::Channel<T>& channel)
779 : ChannelHandle<T>(channel) {}
780
781 template <typename U>
782 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
783 CreateSpmcChannel(Allocator&, uint16_t);
784
785 template <typename U, uint16_t kCapacity>
786 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
788};
789
791template <typename T>
792class SpscChannelHandle final : public ChannelHandle<T> {
793 public:
794 constexpr SpscChannelHandle() = default;
795
796 private:
797 explicit SpscChannelHandle(internal::Channel<T>& channel)
798 : ChannelHandle<T>(channel) {}
799
800 template <typename U>
801 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
802 CreateSpscChannel(Allocator&, uint16_t);
803
804 template <typename U, uint16_t kCapacity>
805 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
807};
808
812template <typename T>
813class MpChannelHandle final : public ChannelHandle<T> {
814 public:
815 constexpr MpChannelHandle() = default;
816
818 : ChannelHandle<T>(other) {}
819
820 MpChannelHandle& operator=(const MpmcChannelHandle<T>& other) {
822 return *this;
823 }
824
826 : ChannelHandle<T>(std::move(other)) {}
827
828 MpChannelHandle& operator=(MpmcChannelHandle<T>&& other) {
829 ChannelHandle<T>::operator=(std::move(other));
830 return *this;
831 }
832
834 : ChannelHandle<T>(other) {}
835
836 MpChannelHandle& operator=(const MpscChannelHandle<T>& other) {
838 return *this;
839 }
840
842 : ChannelHandle<T>(std::move(other)) {}
843
844 MpChannelHandle& operator=(MpscChannelHandle<T>&& other) {
845 ChannelHandle<T>::operator=(std::move(other));
846 return *this;
847 }
848
850};
851
855template <typename T>
856class McChannelHandle final : public ChannelHandle<T> {
857 public:
858 constexpr McChannelHandle() = default;
859
861 : ChannelHandle<T>(other) {}
862
863 McChannelHandle& operator=(const MpmcChannelHandle<T>& other) {
865 return *this;
866 }
867
869 : ChannelHandle<T>(std::move(other)) {}
870
871 McChannelHandle& operator=(MpmcChannelHandle<T>&& other) {
872 ChannelHandle<T>::operator=(std::move(other));
873 return *this;
874 }
875
877 : ChannelHandle<T>(other) {}
878
879 McChannelHandle& operator=(const SpmcChannelHandle<T>& other) {
881 return *this;
882 }
883
885 : ChannelHandle<T>(std::move(other)) {}
886
887 McChannelHandle& operator=(SpmcChannelHandle<T>&& other) {
888 ChannelHandle<T>::operator=(std::move(other));
889 return *this;
890 }
891
893};
894
899template <typename T, uint16_t kCapacity>
900class ChannelStorage final
901 : private internal::BaseChannelStorage<T, kCapacity> {
902 public:
903 template <typename U, uint16_t kCap>
904 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
905 CreateSpscChannel(ChannelStorage<U, kCap>& storage);
906
907 template <typename U, uint16_t kCap>
908 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
909 ChannelStorage<U, kCap>& storage);
910
911 template <typename U, uint16_t kCap>
912 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
913 ChannelStorage<U, kCap>& storage);
914
915 template <typename U, uint16_t kCap>
916 friend MpmcChannelHandle<U> CreateMpmcChannel(
917 ChannelStorage<U, kCap>& storage);
918
919 ~ChannelStorage() = default;
920
923 [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(*this) {
924 std::lock_guard lock(*this);
925 return this->active_locked();
926 }
927
928 constexpr uint16_t capacity() const { return kCapacity; }
929};
930
931template <typename T>
932class [[nodiscard]] ReceiveFuture final
933 : public internal::ChannelFuture<ReceiveFuture<T>, T, std::optional<T>> {
934 private:
936
937 public:
938 constexpr ReceiveFuture() = default;
939
941 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel())
942 : Base(std::move(other)) {}
943
944 ReceiveFuture& operator=(ReceiveFuture&& other)
945 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
946 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
947 return static_cast<ReceiveFuture&>(this->MoveAssignFrom(other));
948 }
949
950 ~ReceiveFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
951 this->RemoveFromChannel();
952 }
953
954 private:
955 friend Base;
956 // Suppress `no uniquely matching class member found` Doxygen error
960 friend Receiver<T>;
961 template <typename, typename>
962 friend class CallbackTask;
963
964 explicit ReceiveFuture(internal::Channel<T>* channel)
965 PW_LOCKS_EXCLUDED(*channel)
966 : Base(channel, this->kAllowClosed) {}
967
968 PollOptional<T> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
969 if (this->channel() == nullptr) {
970 PW_DASSERT(this->is_pendable());
971 return Ready<std::optional<T>>(std::nullopt);
972 }
973
974 this->channel()->lock();
975 PW_DASSERT(this->is_pendable());
976 if (this->channel()->empty()) {
977 return this->StoreWakerForReceiveIfOpen(cx)
978 ? Pending()
979 : Ready<std::optional<T>>(std::nullopt);
980 }
981
982 auto result = Ready(this->channel()->PopAndWake());
983 this->Complete();
984 return result;
985 }
986};
987
988template <>
989class [[nodiscard]] ReceiveFuture<void> final
990 : public internal::ChannelFuture<ReceiveFuture<void>, void, bool> {
991 private:
993
994 public:
995 constexpr ReceiveFuture() = default;
996
998 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel())
999 : Base(std::move(other)) {}
1000
1001 ReceiveFuture& operator=(ReceiveFuture&& other)
1002 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
1003 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1004 return static_cast<ReceiveFuture&>(this->MoveAssignFrom(other));
1005 }
1006
1007 ~ReceiveFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1008 this->RemoveFromChannel();
1009 }
1010
1011 private:
1012 friend Base;
1013 // Suppress `no uniquely matching class member found` Doxygen error
1017 friend Receiver<void>;
1018 template <typename, typename>
1019 friend class CallbackTask;
1020
1021 explicit ReceiveFuture(internal::Channel<void>* channel)
1022 PW_LOCKS_EXCLUDED(*channel)
1023 : Base(channel, this->kAllowClosed) {}
1024
1025 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
1026 if (this->channel() == nullptr) {
1027 PW_DASSERT(this->is_pendable());
1028 return Ready(false);
1029 }
1030
1031 this->channel()->lock();
1032 PW_DASSERT(this->is_pendable());
1033 if (this->channel()->empty()) {
1034 return this->StoreWakerForReceiveIfOpen(cx) ? Pending() : Ready(false);
1035 }
1036
1037 this->channel()->PopAndWake();
1038 this->Complete();
1039 return Ready(true);
1040 }
1041};
1042
1044template <typename T>
1046 public:
1047 constexpr Receiver() : channel_(nullptr) {}
1048
1049 Receiver(const Receiver& other) = delete;
1050 Receiver& operator=(const Receiver& other) = delete;
1051
1052 Receiver(Receiver&& other) noexcept
1053 : channel_(std::exchange(other.channel_, nullptr)) {}
1054
1055 Receiver& operator=(Receiver&& other) noexcept {
1056 if (this == &other) {
1057 return *this;
1058 }
1059 if (channel_ != nullptr) {
1060 channel_->remove_receiver();
1061 }
1062 channel_ = std::exchange(other.channel_, nullptr);
1063 return *this;
1064 }
1065
1066 ~Receiver() {
1067 if (channel_ != nullptr) {
1068 channel_->remove_receiver();
1069 }
1070 }
1071
1081 std::conditional_t<std::is_void_v<T>, Status, Result<T>> BlockingReceive(
1082 Dispatcher& dispatcher,
1085 if (channel_ == nullptr) {
1087 }
1088
1089 // Return immediately if a value is available or the channel is closed.
1090 if constexpr (std::is_void_v<T>) {
1091 if (Status result = channel_->TryReceive();
1092 result.ok() || result.IsFailedPrecondition()) {
1093 return result;
1094 }
1095 } else {
1096 if (Result<T> result = channel_->TryReceive();
1097 result.ok() || result.status().IsFailedPrecondition()) {
1098 return result;
1099 }
1100 }
1101
1102 using ReceiveType =
1103 std::conditional_t<std::is_void_v<T>, bool, std::optional<T>>;
1104
1105 ReceiveType result;
1106 sync::TimedThreadNotification notification;
1107
1109 [&result, &notification](ReceiveType&& val) {
1110 result = std::move(val);
1111 notification.release();
1112 },
1113 channel_);
1114 dispatcher.Post(task);
1115
1116 if (timeout == internal::Channel<T>::kWaitForever) {
1117 notification.acquire();
1118 if constexpr (std::is_void_v<T>) {
1119 return result ? OkStatus() : Status::FailedPrecondition();
1120 } else {
1121 if (!result.has_value()) {
1123 }
1124 return Result<T>(std::move(*result));
1125 }
1126 }
1127
1128 if (!notification.try_acquire_for(timeout)) {
1129 return Status::DeadlineExceeded();
1130 }
1131
1132 if constexpr (std::is_void_v<T>) {
1133 return result ? OkStatus() : Status::FailedPrecondition();
1134 } else {
1135 if (!result.has_value()) {
1137 }
1138 return Result<T>(std::move(*result));
1139 }
1140 }
1141
1150 return ReceiveFuture<T>(channel_);
1151 }
1152
1159 std::conditional_t<std::is_void_v<T>, Status, Result<T>> TryReceive() {
1160 if (channel_ == nullptr) {
1162 }
1163
1164 return channel_->TryReceive();
1165 }
1166
1171 void Disconnect() {
1172 if (channel_ != nullptr) {
1173 channel_->remove_receiver();
1174 channel_ = nullptr;
1175 }
1176 }
1177
1179 [[nodiscard]] bool is_open() const {
1180 return channel_ != nullptr && channel_->is_open();
1181 }
1182
1183 private:
1184 template <typename U>
1185 friend class internal::Channel;
1186
1187 template <typename U>
1188 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
1189 CreateMpscChannel(Allocator&, uint16_t);
1190
1191 template <typename U, uint16_t kCapacity>
1192 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
1194
1195 template <typename U>
1196 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
1197 CreateSpscChannel(Allocator&, uint16_t);
1198
1199 template <typename U, uint16_t kCapacity>
1200 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
1202
1203 explicit Receiver(internal::Channel<T>& channel)
1205 : channel_(&channel) {
1206 channel_->add_receiver();
1207 }
1208
1209 internal::Channel<T>* channel_;
1210};
1211
1212template <typename T>
1213class [[nodiscard]] SendFuture final
1214 : public internal::ChannelFuture<SendFuture<T>, T, bool> {
1215 private:
1217
1218 public:
1219 constexpr SendFuture() = default;
1220
1221 SendFuture(SendFuture&& other) PW_LOCKS_EXCLUDED(*other.channel())
1222 : Base(static_cast<Base&&>(other)), value_(std::move(other.value_)) {}
1223
1224 SendFuture& operator=(SendFuture&& other)
1225 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
1226 value_ = std::move(other.value_);
1227 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1228 return static_cast<SendFuture&>(this->MoveAssignFrom(other));
1229 }
1230
1231 ~SendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1232 this->RemoveFromChannel();
1233 }
1234
1235 private:
1236 friend Base;
1237 // Suppress `no uniquely matching class member found` Doxygen error
1239 friend internal::Channel<T>;
1241 friend Sender<T>;
1242
1243 SendFuture(internal::Channel<T>* channel, const T& value)
1244 PW_LOCKS_EXCLUDED(*channel)
1245 : Base(channel), value_(value) {}
1246
1247 SendFuture(internal::Channel<T>* channel, T&& value)
1248 PW_LOCKS_EXCLUDED(*channel)
1249 : Base(channel), value_(std::move(value)) {}
1250
1251 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
1252 if (this->channel() == nullptr) {
1253 PW_DASSERT(this->is_pendable());
1254 return Ready(false);
1255 }
1256
1257 this->channel()->lock();
1258 PW_DASSERT(this->is_pendable());
1259 if (!this->channel()->is_open_locked()) {
1260 this->Complete();
1261 return Ready(false);
1262 }
1263
1264 if (this->channel()->full()) {
1265 this->StoreWakerForSend(cx);
1266 return Pending();
1267 }
1268
1269 this->channel()->PushAndWake(std::move(*value_));
1270 this->Complete();
1271 return Ready(true);
1272 }
1273
1274 std::optional<T> value_;
1275};
1276
1277template <>
1278class [[nodiscard]] SendFuture<void> final
1279 : public internal::ChannelFuture<SendFuture<void>, void, bool> {
1280 private:
1282
1283 public:
1284 constexpr SendFuture() = default;
1285
1286 SendFuture(SendFuture&& other) PW_LOCKS_EXCLUDED(*other.channel())
1287 : Base(static_cast<Base&&>(other)) {}
1288
1289 SendFuture& operator=(SendFuture&& other)
1290 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
1291 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1292 return static_cast<SendFuture&>(this->MoveAssignFrom(other));
1293 }
1294
1295 ~SendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1296 this->RemoveFromChannel();
1297 }
1298
1299 private:
1300 friend Base;
1301 // Suppress `no uniquely matching class member found` Doxygen error
1305 friend Sender<void>;
1306
1307 explicit SendFuture(internal::Channel<void>* channel)
1308 PW_LOCKS_EXCLUDED(*channel)
1309 : Base(channel) {}
1310
1311 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
1312 if (this->channel() == nullptr) {
1313 PW_DASSERT(this->is_pendable());
1314 return Ready(false);
1315 }
1316
1317 this->channel()->lock();
1318 PW_DASSERT(this->is_pendable());
1319 if (!this->channel()->is_open_locked()) {
1320 this->Complete();
1321 return Ready(false);
1322 }
1323
1324 if (this->channel()->full()) {
1325 this->StoreWakerForSend(cx);
1326 return Pending();
1327 }
1328
1329 this->channel()->PushAndWake();
1330 this->Complete();
1331 return Ready(true);
1332 }
1333};
1334
1341template <typename T>
1343 public:
1344 SendReservation(const SendReservation& other) = delete;
1345 SendReservation& operator=(const SendReservation& other) = delete;
1346
1348 : channel_(std::exchange(other.channel_, nullptr)) {}
1349
1350 SendReservation& operator=(SendReservation&& other) {
1351 if (this == &other) {
1352 return *this;
1353 }
1354 Cancel();
1355 channel_ = std::exchange(other.channel_, nullptr);
1356 return *this;
1357 }
1358
1359 ~SendReservation() { Cancel(); }
1360
1362 template <typename... Args>
1363 void Commit(Args&&... args) {
1364 PW_ASSERT(channel_ != nullptr);
1365 channel_->CommitReservationAndRemoveRef(std::forward<Args>(args)...);
1366 channel_ = nullptr;
1367 }
1368
1369 void CommitNotification() {
1370 static_assert(std::is_void_v<T>,
1371 "CommitNotification() is for notification channels only.");
1372 PW_ASSERT(channel_ != nullptr);
1373 channel_->CommitNotificationReservationAndRemoveRef();
1374 channel_ = nullptr;
1375 }
1376
1378 void Cancel() {
1379 if (channel_ != nullptr) {
1380 channel_->DropReservationAndRemoveRef();
1381 channel_ = nullptr;
1382 }
1383 }
1384
1385 private:
1386 // Suppress `no uniquely matching class member found` Doxygen error
1388 friend internal::Channel<T>;
1390 friend class ReserveSendFuture<T>;
1391 friend class Sender<T>;
1392
1393 explicit SendReservation(internal::Channel<T>& channel)
1395 : channel_(&channel) {
1396 channel_->add_ref();
1397 }
1398
1399 internal::Channel<T>* channel_;
1400};
1401
1402template <typename T>
1403class [[nodiscard]] ReserveSendFuture final
1404 : public internal::ChannelFuture<ReserveSendFuture<T>,
1405 T,
1406 std::optional<SendReservation<T>>> {
1407 private:
1408 using Base = internal::
1409 ChannelFuture<ReserveSendFuture, T, std::optional<SendReservation<T>>>;
1410
1411 public:
1412 constexpr ReserveSendFuture() = default;
1413
1414 ReserveSendFuture(ReserveSendFuture&& other) : Base(std::move(other)) {}
1415
1416 ReserveSendFuture& operator=(ReserveSendFuture&& other) {
1417 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1418 return static_cast<ReserveSendFuture&>(this->MoveAssignFrom(other));
1419 }
1420
1421 ~ReserveSendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1422 this->RemoveFromChannel();
1423 }
1424
1425 private:
1426 friend Base;
1427 // Suppress `no uniquely matching class member found` Doxygen error
1429 friend internal::Channel<T>;
1431 friend Sender<T>;
1432
1433 explicit ReserveSendFuture(internal::Channel<T>* channel)
1434 PW_LOCKS_EXCLUDED(*channel)
1435 : Base(channel) {}
1436
1438 PW_LOCKS_EXCLUDED(*this->channel()) {
1439 if (this->channel() == nullptr) {
1440 PW_DASSERT(this->is_pendable());
1441 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1442 }
1443
1444 this->channel()->lock();
1445 PW_DASSERT(this->is_pendable());
1446 if (!this->channel()->is_open_locked()) {
1447 this->Complete();
1448 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1449 }
1450
1451 if (this->channel()->remaining_capacity_locked() == 0) {
1452 this->StoreWakerForReserveSend(cx);
1453 return Pending();
1454 }
1455
1456 this->channel()->add_reservation();
1457 SendReservation<T> reservation(*this->channel());
1458 this->Complete();
1459 return reservation;
1460 }
1461};
1462
1464template <typename T>
1465class Sender {
1466 public:
1467 constexpr Sender() : channel_(nullptr) {}
1468
1469 Sender(const Sender& other) = delete;
1470 Sender& operator=(const Sender& other) = delete;
1471
1472 Sender(Sender&& other) noexcept
1473 : channel_(std::exchange(other.channel_, nullptr)) {}
1474
1475 Sender& operator=(Sender&& other) noexcept {
1476 if (this == &other) {
1477 return *this;
1478 }
1479 if (channel_ != nullptr) {
1480 channel_->remove_sender();
1481 }
1482 channel_ = std::exchange(other.channel_, nullptr);
1483 return *this;
1484 }
1485
1486 ~Sender() {
1487 if (channel_ != nullptr) {
1488 channel_->remove_sender();
1489 }
1490 }
1491
1500 template <typename U>
1501 SendFuture<T> Send(U&& value) {
1502 return SendFuture<T>(channel_, std::forward<U>(value));
1503 }
1504
1505 SendFuture<T> Send() {
1506 static_assert(std::is_void_v<T>,
1507 "Send() with no arguments is for notification channels only");
1508 return SendFuture<T>(channel_);
1509 }
1510
1517
1528 if (channel_ == nullptr) {
1530 }
1531 return channel_->TryReserveSend();
1532 }
1533
1543 template <typename U,
1544 int&... kExplicitGuard,
1545 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
1546 bool> = true>
1547 Status TrySend(U&& value) {
1548 if (channel_ == nullptr) {
1550 }
1551 return channel_->TrySend(std::forward<U>(value));
1552 }
1553
1555 template <typename U,
1556 int&... kExplicitGuard,
1557 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
1558 std::is_constructible_v<T, U>,
1559 bool> = true>
1560 Status TrySend(U&& value) {
1561 return TrySend(T(value));
1562 }
1563
1575 static_assert(
1576 std::is_void_v<T>,
1577 "TrySend() with no arguments is for notification channels only");
1578 if (channel_ == nullptr) {
1580 }
1581 return channel_->TrySend();
1582 }
1583
1594 template <typename U,
1595 int&... kExplicitGuard,
1596 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
1597 bool> = true>
1599 U&& value,
1602 return BlockingSendMoveOrCopy(dispatcher, std::forward<U>(value), timeout);
1603 }
1604
1606 template <typename U,
1607 int&... kExplicitGuard,
1608 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
1609 std::is_constructible_v<T, U>,
1610 bool> = true>
1612 U&& value,
1615 return BlockingSendMoveOrCopy(dispatcher, T(value), timeout);
1616 }
1617
1631 return BlockingSendMoveOrCopy(dispatcher, timeout);
1632 }
1633
1638 void Disconnect() {
1639 if (channel_ != nullptr) {
1640 channel_->remove_sender();
1641 channel_ = nullptr;
1642 }
1643 }
1644
1646 uint16_t remaining_capacity() const {
1647 return channel_ != nullptr ? channel_->remaining_capacity() : 0;
1648 }
1649
1651 uint16_t capacity() const {
1652 return channel_ != nullptr ? channel_->capacity() : 0;
1653 }
1654
1656 [[nodiscard]] bool is_open() const {
1657 return channel_ != nullptr && channel_->is_open();
1658 }
1659
1660 private:
1661 template <typename U>
1662 friend class internal::Channel;
1663
1664 template <typename U>
1665 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
1666 CreateSpmcChannel(Allocator&, uint16_t);
1667
1668 template <typename U, uint16_t kCapacity>
1669 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
1671
1672 template <typename U>
1673 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
1674 CreateSpscChannel(Allocator&, uint16_t);
1675
1676 template <typename U, uint16_t kCapacity>
1677 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
1679
1680 explicit Sender(internal::Channel<T>& channel)
1682 : channel_(&channel) {
1683 channel_->add_sender();
1684 }
1685
1686 template <typename U>
1687 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1688 U&& value,
1690 PW_LOCKS_EXCLUDED(*channel_) {
1691 if (channel_ == nullptr) {
1693 }
1694
1695 if (Status status = channel_->TrySend(std::forward<U>(value));
1696 status.ok() || status.IsFailedPrecondition()) {
1697 return status;
1698 }
1699
1700 return BlockingSendFuture(
1701 dispatcher, // NOLINTNEXTLINE(bugprone-use-after-move)
1702 SendFuture<T>(channel_, std::forward<U>(value)),
1703 timeout);
1704 }
1705
1706 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1708 PW_LOCKS_EXCLUDED(*channel_) {
1709 if (channel_ == nullptr) {
1711 }
1712
1713 if (Status status = channel_->TrySend();
1714 status.ok() || status.IsFailedPrecondition()) {
1715 return status;
1716 }
1717
1718 return BlockingSendFuture(dispatcher, SendFuture<T>(channel_), timeout);
1719 }
1720
1721 Status BlockingSendFuture(Dispatcher& dispatcher,
1722 SendFuture<T>&& future,
1724 PW_LOCKS_EXCLUDED(*channel_) {
1725 Status status;
1726 sync::TimedThreadNotification notification;
1727
1728 CallbackTask task(
1729 [&status, &notification](bool result) {
1730 status = result ? OkStatus() : Status::FailedPrecondition();
1731 notification.release();
1732 },
1733 std::move(future));
1734 dispatcher.Post(task);
1735
1736 if (timeout == internal::Channel<T>::kWaitForever) {
1737 notification.acquire();
1738 return status;
1739 }
1740
1741 if (!notification.try_acquire_for(timeout)) {
1742 task.Deregister();
1743 return Status::DeadlineExceeded();
1744 }
1745 return status;
1746 }
1747
1748 internal::Channel<T>* channel_;
1749};
1750
1764template <typename T>
1765std::optional<MpmcChannelHandle<T>> CreateMpmcChannel(Allocator& alloc,
1766 uint16_t capacity) {
1767 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1768 if (channel == nullptr) {
1769 return std::nullopt;
1770 }
1771 std::lock_guard lock(*channel);
1772 return MpmcChannelHandle<T>(*channel);
1773}
1774
1786template <typename T, uint16_t kCapacity>
1788 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1789 PW_DASSERT(!storage.active_locked());
1790 return MpmcChannelHandle<T>(storage);
1791}
1792
1806template <typename T>
1807std::optional<std::tuple<MpscChannelHandle<T>, Receiver<T>>> CreateMpscChannel(
1808 Allocator& alloc, uint16_t capacity) {
1809 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1810 if (channel == nullptr) {
1811 return std::nullopt;
1812 }
1813 std::lock_guard lock(*channel);
1814 return std::make_tuple(MpscChannelHandle<T>(*channel), Receiver<T>(*channel));
1815}
1816
1828template <typename T, uint16_t kCapacity>
1829std::tuple<MpscChannelHandle<T>, Receiver<T>> CreateMpscChannel(
1831 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1832 PW_DASSERT(!storage.active_locked());
1833 return std::make_tuple(MpscChannelHandle<T>(storage), Receiver<T>(storage));
1834}
1835
1849template <typename T>
1850std::optional<std::tuple<SpmcChannelHandle<T>, Sender<T>>> CreateSpmcChannel(
1851 Allocator& alloc, uint16_t capacity) {
1852 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1853 if (channel == nullptr) {
1854 return std::nullopt;
1855 }
1856 std::lock_guard lock(*channel);
1857 return std::make_tuple(SpmcChannelHandle<T>(*channel), Sender<T>(*channel));
1858}
1859
1871template <typename T, uint16_t kCapacity>
1872std::tuple<SpmcChannelHandle<T>, Sender<T>> CreateSpmcChannel(
1874 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1875 PW_DASSERT(!storage.active_locked());
1876 return std::make_tuple(SpmcChannelHandle<T>(storage), Sender<T>(storage));
1877}
1878
1892template <typename T>
1893std::optional<std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>>>
1894CreateSpscChannel(Allocator& alloc, uint16_t capacity) {
1895 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1896 if (channel == nullptr) {
1897 return std::nullopt;
1898 }
1899 std::lock_guard lock(*channel);
1900 return std::make_tuple(SpscChannelHandle<T>(*channel),
1901 Sender<T>(*channel),
1902 Receiver<T>(*channel));
1903}
1904
1916template <typename T, uint16_t kCapacity>
1917std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>> CreateSpscChannel(
1919 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1920 PW_DASSERT(!storage.active_locked());
1921 return std::make_tuple(
1922 SpscChannelHandle<T>(storage), Sender<T>(storage), Receiver<T>(storage));
1923}
1924
1926
1927namespace internal {
1928
1929inline void BaseChannelFuture::Complete() PW_UNLOCK_FUNCTION(*channel_) {
1930 channel_->RemoveRefAndDestroyIfUnreferenced();
1931 channel_ = nullptr;
1932}
1933
1934} // namespace internal
1935} // namespace pw::async2
Definition: allocator.h:42
T * New(Args &&... args)
Definition: allocator.h:66
Abstract interface for releasing memory.
Definition: deallocator.h:30
void Deallocate(void *ptr)
Definition: deallocator.h:316
Definition: deque.h:202
Definition: result.h:145
Definition: status.h:120
static constexpr Status DeadlineExceeded()
Definition: status.h:177
static constexpr Status Unavailable()
Definition: status.h:304
static constexpr Status FailedPrecondition()
Definition: status.h:243
Definition: callback_task.h:44
Channel handle for a particular type T.
Definition: channel.h:696
Sender< T > CreateSender()
Definition: channel.h:713
Receiver< T > CreateReceiver()
Definition: channel.h:720
Definition: channel.h:901
bool active() const
Definition: channel.h:923
Definition: task.h:47
Definition: future.h:408
Definition: dispatcher.h:74
constexpr bool is_pendable() const
Definition: future.h:255
void MarkComplete()
Definition: future.h:306
constexpr bool is_complete() const
Definition: future.h:260
Definition: future.h:114
Definition: channel.h:856
Definition: channel.h:813
A handle to a multi-producer, multi-consumer channel.
Definition: channel.h:728
friend std::optional< MpmcChannelHandle< U > > CreateMpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1765
A handle to a multi-producer, single-consumer channel.
Definition: channel.h:750
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1807
Definition: poll.h:138
constexpr bool IsReady() const noexcept
Returns whether or not this value is Ready.
Definition: poll.h:211
Definition: channel.h:933
A receiver which reads values from an asynchronous channel.
Definition: channel.h:1045
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1807
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1179
std::conditional_t< std::is_void_v< T >, Status, Result< T > > TryReceive()
Definition: channel.h:1159
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1894
void Disconnect()
Definition: channel.h:1171
std::conditional_t< std::is_void_v< T >, Status, Result< T > > BlockingReceive(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1081
ReceiveFuture< T > Receive()
Definition: channel.h:1149
Definition: channel.h:1406
Definition: channel.h:1214
Definition: channel.h:1342
void Cancel()
Releases the reservation, making the space available for other senders.
Definition: channel.h:1378
void Commit(Args &&... args)
Commits a value to a reserved slot.
Definition: channel.h:1363
A sender which writes values to an asynchronous channel.
Definition: channel.h:1465
SendFuture< T > Send(U &&value)
Definition: channel.h:1501
Status TrySend(U &&value)
Definition: channel.h:1547
Result< SendReservation< T > > TryReserveSend()
Definition: channel.h:1527
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1850
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1894
ReserveSendFuture< T > ReserveSend()
Definition: channel.h:1516
uint16_t capacity() const
Returns the maximum capacity of the channel.
Definition: channel.h:1651
Status BlockingSend(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1628
Status BlockingSend(Dispatcher &dispatcher, U &&value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1598
uint16_t remaining_capacity() const
Returns the remaining capacity of the channel.
Definition: channel.h:1646
void Disconnect()
Definition: channel.h:1638
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1656
Status TrySend()
Definition: channel.h:1574
A handle to a single-producer, multi-consumer channel.
Definition: channel.h:771
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1850
A handle to a single-producer, single-consumer channel.
Definition: channel.h:792
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1894
bool is_complete() const
True if the future has returned Ready().
Definition: channel.h:75
bool is_pendable() const
True if the Pend can be called on the future.
Definition: channel.h:72
Definition: channel.h:166
Definition: channel.h:316
Definition: channel.h:131
Definition: channel.h:401
Definition: channel.h:595
Definition: storage.h:86
Definition: storage.h:39
Definition: interrupt_spin_lock.h:50
Definition: timed_thread_notification.h:40
bool try_acquire_for(chrono::SystemClock::duration timeout)
std::optional< std::tuple< SpmcChannelHandle< T >, Sender< T > > > CreateSpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1850
std::optional< std::tuple< MpscChannelHandle< T >, Receiver< T > > > CreateMpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1807
std::optional< MpmcChannelHandle< T > > CreateMpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1765
std::optional< std::tuple< SpscChannelHandle< T >, Sender< T >, Receiver< T > > > CreateSpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1894
std::conditional_t< std::is_void_v< typename T::value_type >, ReadyType, typename T::value_type > FutureValue
Definition: future.h:110
constexpr PendingType Pending()
Returns a value indicating that an operation was not yet able to complete.
Definition: poll.h:353
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:337
std::chrono::duration< rep, period > duration
Alias for durations representable with this clock.
Definition: system_clock.h:91
constexpr bool CheckedIncrement(T &base, Inc inc)
Definition: checked_arithmetic.h:118
constexpr Status OkStatus()
Definition: status.h:450
#define PW_LOCKABLE(name)
Definition: lock_annotations.h:210
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_NO_LOCK_SAFETY_ANALYSIS
Definition: lock_annotations.h:296
#define PW_EXCLUSIVE_LOCK_FUNCTION(...)
Definition: lock_annotations.h:232
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:147
#define PW_LOCK_RETURNED(x)
Definition: lock_annotations.h:199
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:249
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:178