C/C++ API Reference
Loading...
Searching...
No Matches
channel.h
1// Copyright 2025 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include <mutex>
17#include <optional>
18
19#include "lib/stdcompat/type_traits.h"
20#include "pw_allocator/allocator.h"
21#include "pw_async2/callback_task.h"
22#include "pw_async2/dispatcher.h"
23#include "pw_async2/future.h"
24#include "pw_containers/deque.h"
25#include "pw_numeric/checked_arithmetic.h"
26#include "pw_result/result.h"
27#include "pw_sync/interrupt_spin_lock.h"
28#include "pw_sync/lock_annotations.h"
29#include "pw_sync/timed_thread_notification.h"
30
31namespace pw::async2 {
32
33template <typename T>
34class Receiver;
35
36template <typename T>
37class ReceiveFuture;
38
39template <typename T>
40class Sender;
41
42template <typename T>
43class SendFuture;
44
45template <typename T>
46class ReserveSendFuture;
47
48template <typename T>
49class SendReservation;
50
51template <typename T, uint16_t kCapacity>
52class ChannelStorage;
53
54namespace internal {
55
56template <typename T>
57class Channel;
58
59class BaseChannel;
60
62 public:
63 constexpr BaseChannelFuture() : channel_(nullptr) {}
64
65 BaseChannelFuture(const BaseChannelFuture&) = delete;
66 BaseChannelFuture& operator=(const BaseChannelFuture&) = delete;
67
68 // Derived classes call MoveAssignFrom to move rather than use the operator.
69 BaseChannelFuture& operator=(BaseChannelFuture&&) = delete;
70
72 [[nodiscard]] bool is_pendable() const { return core_.is_pendable(); }
73
75 [[nodiscard]] bool is_complete() const { return core_.is_complete(); }
76
77 protected:
78 // Creates a new future, storing nullptr if `channel` is nullptr or if the
79 // channel is closed.
80 explicit BaseChannelFuture(BaseChannel* channel) PW_LOCKS_EXCLUDED(*channel);
81
82 enum AllowClosed { kAllowClosed };
83
84 // Creates a new future. Always increases the ref count, even if the channel
85 // is closed. This allows ReceiveFutures to read values from closed channels.
86 BaseChannelFuture(BaseChannel* channel, AllowClosed)
87 PW_LOCKS_EXCLUDED(*channel)
88 : core_(FutureState::kPending) {
89 StoreAndAddRefIfNonnull(channel);
90 }
91
92 BaseChannelFuture(BaseChannelFuture&& other)
93 PW_LOCKS_EXCLUDED(*channel_, *other.channel_)
94 : channel_(other.channel_) {
95 MoveFrom(other);
96 }
97
98 BaseChannelFuture& MoveAssignFrom(BaseChannelFuture& other)
99 PW_LOCKS_EXCLUDED(*channel_, *other.channel_);
100
101 // Unlists this future and removes a reference from the channel.
102 void RemoveFromChannel() PW_LOCKS_EXCLUDED(*channel_);
103
104 bool StoreWakerForReceiveIfOpen(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
105
106 void StoreWakerForSend(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
107
108 void StoreWakerForReserveSend(Context& cx) PW_UNLOCK_FUNCTION(*channel_);
109
110 void MarkCompleted() { core_.MarkComplete(); }
111
112 void Complete() PW_UNLOCK_FUNCTION(*channel_);
113
114 BaseChannel* base_channel() PW_LOCK_RETURNED(channel_) { return channel_; }
115
116 private:
117 void StoreAndAddRefIfNonnull(BaseChannel* channel)
118 PW_LOCKS_EXCLUDED(*channel);
119
120 void MoveFrom(BaseChannelFuture& other)
121 PW_LOCKS_EXCLUDED(*channel_, *other.channel_);
122
123 BaseChannel* channel_;
124 FutureCore core_;
125
126 public:
127 using List = FutureList<&BaseChannelFuture::core_>;
128};
129
130// Adds a Pend function to BaseChannelFuture.
131template <typename Derived, typename T, typename FutureValue>
133 public:
134 using value_type = FutureValue;
135
136 Poll<value_type> Pend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
137 Poll<value_type> result = static_cast<Derived&>(*this).DoPend(cx);
138 if (result.IsReady()) {
139 // If Ready(), the future is no longer associated with the channel, so it
140 // can be marked complete without the lock held.
141 MarkCompleted();
142 }
143 return result;
144 }
145
146 protected:
147 constexpr ChannelFuture() = default;
148
149 explicit ChannelFuture(Channel<T>* channel) : BaseChannelFuture(channel) {}
150
151 ChannelFuture(Channel<T>* channel, AllowClosed)
152 : BaseChannelFuture(channel, kAllowClosed) {}
153
154 ChannelFuture(ChannelFuture&& other) : BaseChannelFuture(std::move(other)) {}
155
156 Channel<T>* channel() PW_LOCK_RETURNED(this->base_channel()) {
157 return static_cast<Channel<T>*>(base_channel());
158 }
159
160 private:
161 using BaseChannelFuture::base_channel;
162 using BaseChannelFuture::MarkCompleted;
163};
164
165// Internal generic channel type. BaseChannel is not exposed to users. Its
166// public interface is for internal consumption.
167class PW_LOCKABLE("pw::async2::internal::BaseChannel") BaseChannel {
168 public:
169 static constexpr chrono::SystemClock::duration kWaitForever =
170 chrono::SystemClock::duration::max();
171
172 // Acquires the channel's lock.
173 void lock() PW_EXCLUSIVE_LOCK_FUNCTION() { lock_.lock(); }
174
175 // Releases the channel's lock.
176 void unlock() PW_UNLOCK_FUNCTION() { lock_.unlock(); }
177
178 [[nodiscard]] bool is_open() PW_LOCKS_EXCLUDED(*this) {
179 std::lock_guard lock(*this);
180 return is_open_locked();
181 }
182
183 bool is_open_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
184 return !closed_;
185 }
186
187 [[nodiscard]] bool active_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
188 return ref_count_ != 0;
189 }
190
191 // Removes a reference to this channel and destroys the channel if needed.
192 void RemoveRefAndDestroyIfUnreferenced() PW_UNLOCK_FUNCTION();
193
194 void Close() PW_LOCKS_EXCLUDED(*this) {
195 std::lock_guard lock(*this);
196 CloseLocked();
197 }
198
199 // Adds a SendFuture or ReserveSendFuture to the list of pending futures.
200 void add_send_future(BaseChannelFuture& future)
202 send_futures_.Push(future);
203 }
204
205 // Adds a ReceiveFuture to the list of pending futures.
206 void add_receive_future(BaseChannelFuture& future)
208 receive_futures_.Push(future);
209 }
210
211 void DropReservationAndRemoveRef() PW_LOCKS_EXCLUDED(*this);
212
213 void add_receiver() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
214 add_object(receiver_count_);
215 }
216
217 void add_sender() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
218 add_object(sender_count_);
219 }
220
221 void add_handle() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
222 add_object(handle_count_);
223 }
224
225 void add_reservation() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
226 reservations_ += 1;
227 }
228
229 void remove_reservation() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
230 PW_DASSERT(reservations_ > 0);
231 reservations_ -= 1;
232 }
233
234 void remove_sender() PW_LOCKS_EXCLUDED(*this) {
235 remove_object(&sender_count_);
236 }
237
238 void remove_receiver() PW_LOCKS_EXCLUDED(*this) {
239 remove_object(&receiver_count_);
240 }
241
242 void remove_handle() PW_LOCKS_EXCLUDED(*this) {
243 remove_object(&handle_count_);
244 }
245
246 void add_ref() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
247 PW_ASSERT(CheckedIncrement(ref_count_, 1));
248 }
249
250 protected:
251 constexpr BaseChannel() = default;
252
253 ~BaseChannel();
254
255 void WakeOneReceiver() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
256 receive_futures_.ResolveOneIfAvailable();
257 }
258
259 void WakeOneSender() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
260 send_futures_.ResolveOneIfAvailable();
261 }
262
263 uint16_t reservations() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
264 return reservations_;
265 }
266
267 private:
268 void add_object(uint8_t& counter) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
269 if (is_open_locked()) {
270 PW_ASSERT(CheckedIncrement(counter, 1));
271 }
272 add_ref();
273 }
274
275 // Takes a pointer since otherwise Clang's thread safety analysis complains
276 // about taking a reference without the lock held.
277 void remove_object(uint8_t* counter) PW_LOCKS_EXCLUDED(*this);
278
279 // Returns true if the channel should be closed following a reference
280 // decrement.
281 //
282 // Handles can create new senders and receivers, so as long as one exists, the
283 // channel should remain open. Without active handles, the channel closes when
284 // either end fully hangs up.
285 bool should_close() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
286 return handle_count_ == 0 && (sender_count_ == 0 || receiver_count_ == 0);
287 }
288
289 void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(*this);
290
291 // Destroys the channel if it is dynamically allocated.
292 virtual void Destroy() {}
293
294 BaseChannelFuture::List send_futures_ PW_GUARDED_BY(*this);
295 BaseChannelFuture::List receive_futures_ PW_GUARDED_BY(*this);
296
297 uint16_t reservations_ PW_GUARDED_BY(*this) = 0;
298 bool closed_ PW_GUARDED_BY(*this) = false;
299 mutable sync::InterruptSpinLock lock_;
300
301 // Channels are reference counted in two ways:
302 //
303 // - Senders and receivers are tracked independently. Once either reaches
304 // zero, the channel is closed, but not destroyed. No new values can be
305 // sent, but any buffered values can still be read.
306 //
307 // - Overall object reference count, including senders, receivers, futures,
308 // and channel handles. Once this reaches zero, the channel is destroyed.
309 //
310 uint8_t sender_count_ PW_GUARDED_BY(*this) = 0;
311 uint8_t receiver_count_ PW_GUARDED_BY(*this) = 0;
312 uint8_t handle_count_ PW_GUARDED_BY(*this) = 0;
313 uint16_t ref_count_ PW_GUARDED_BY(*this) = 0;
314};
315
316template <typename T>
318 using size_type = uint16_t;
319
320 public:
321 [[nodiscard]] static ChannelDeque TryAllocate(Allocator& alloc,
322 uint16_t capacity) {
323 return ChannelDeque(FixedDeque<T>::TryAllocate(alloc, capacity));
324 }
325
326 template <size_t kAlignment, size_t kCapacity>
328 : dequeue_(storage) {}
329
330 // Exposes the minimal deque api needed by a data channel.
331 [[nodiscard]] size_type capacity() const { return dequeue_.capacity(); }
332 [[nodiscard]] size_type size() const { return dequeue_.size(); }
333 [[nodiscard]] bool empty() const { return dequeue_.empty(); }
334 [[nodiscard]] auto deallocator() const { return dequeue_.deallocator(); }
335 [[nodiscard]] T& front() { return dequeue_.front(); }
336 [[nodiscard]] const T& front() const { return dequeue_.front(); }
337
338 void pop_front() { dequeue_.pop_front(); }
339
340 template <typename U>
341 void push_back(U&& value) {
342 dequeue_.push_back(std::forward<U>(value));
343 }
344
345 template <typename... Args>
346 void emplace_back(Args&&... args) {
347 dequeue_.emplace_back(std::forward<Args>(args)...);
348 }
349
350 private:
351 explicit ChannelDeque(FixedDeque<T>&& other) : dequeue_(std::move(other)) {}
352
353 FixedDeque<T> dequeue_;
354};
355
356template <>
357class ChannelDeque<void> {
358 using size_type = uint16_t;
359
360 public:
361 ~ChannelDeque() = default;
362 ChannelDeque(const ChannelDeque& other) = delete;
363 ChannelDeque& operator=(const ChannelDeque& other) = delete;
364 ChannelDeque(ChannelDeque&& other) = default;
365 ChannelDeque& operator=(ChannelDeque&& other) = default;
366
367 explicit ChannelDeque(uint16_t capacity) : capacity_(capacity) {}
368
369 [[nodiscard]] static ChannelDeque TryAllocate(Allocator& alloc,
370 uint16_t capacity) {
371 return ChannelDeque(alloc, capacity);
372 }
373
374 // Exposes a minimal deque-like api needed by a notification channel.
375 [[nodiscard]] size_type capacity() const { return capacity_; }
376 [[nodiscard]] size_type size() const { return size_; }
377 [[nodiscard]] bool empty() const { return size_ == 0; }
378 [[nodiscard]] Deallocator* deallocator() const { return deallocator_; }
379
380 void push_back() {
381 PW_ASSERT(size_ < capacity_);
382 ++size_;
383 }
384
385 void pop_front() {
386 PW_ASSERT(size_ != 0);
387 --size_;
388 }
389
390 private:
391 ChannelDeque(Allocator& alloc, uint16_t capacity)
392 : deallocator_(&alloc), capacity_(capacity) {}
393
394 Deallocator* deallocator_ = nullptr;
395 uint16_t capacity_ = 0;
396 uint16_t size_ = 0;
397};
398
399// Like BaseChannel, Channel is an internal class that is not exposed to
400// users. Its public interface is for internal consumption.
401template <typename T>
402class Channel : public BaseChannel {
403 public:
404 Sender<T> CreateSender() PW_LOCKS_EXCLUDED(*this) {
405 {
406 std::lock_guard guard(*this);
407 if (is_open_locked()) {
408 return Sender<T>(*this);
409 }
410 }
411 return Sender<T>();
412 }
413
414 Receiver<T> CreateReceiver() PW_LOCKS_EXCLUDED(*this) {
415 {
416 std::lock_guard guard(*this);
417 if (is_open_locked()) {
418 return Receiver<T>(*this);
419 }
420 }
421 return Receiver<T>();
422 }
423
424 template <typename U,
425 int&... kExplicitGuard,
426 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
427 bool> = true>
428 void PushAndWake(U&& value) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
429 deque_.push_back(std::forward<U>(value));
430 WakeOneReceiver();
431 }
432
433 template <typename U,
434 int&... kExplicitGuard,
435 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
436 std::is_constructible_v<T, U>,
437 bool> = true>
438 void PushAndWake(U&& value) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
439 PushAndWake(T(value));
440 }
441
442 void PushAndWake() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
443 deque_.push_back();
444 WakeOneReceiver();
445 }
446
447 template <typename... Args>
448 void EmplaceAndWake(Args&&... args) PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
449 deque_.emplace_back(std::forward<Args>(args)...);
450 WakeOneReceiver();
451 }
452
453 auto PopAndWake() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
454 if constexpr (std::is_void_v<T>) {
455 deque_.pop_front();
456 WakeOneSender();
457 } else {
458 T value = std::move(deque_.front());
459 deque_.pop_front();
460
461 WakeOneSender();
462 return value;
463 }
464 }
465
466 bool full() const PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
467 return remaining_capacity_locked() == 0;
468 }
469
470 uint16_t remaining_capacity() PW_LOCKS_EXCLUDED(*this) {
471 std::lock_guard guard(*this);
472 return remaining_capacity_locked();
473 }
474
475 uint16_t remaining_capacity_locked() const
477 return deque_.capacity() - deque_.size() - reservations();
478 }
479
480 uint16_t capacity() const PW_NO_LOCK_SAFETY_ANALYSIS {
481 // SAFETY: The capacity of `deque_` cannot change.
482 return deque_.capacity();
483 }
484
485 [[nodiscard]] bool empty() PW_EXCLUSIVE_LOCKS_REQUIRED(*this) {
486 return deque_.empty();
487 }
488
489 template <typename U,
490 int&... kExplicitGuard,
491 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
492 bool> = true>
493 Status TrySend(U&& value) PW_LOCKS_EXCLUDED(*this) {
494 std::lock_guard guard(*this);
495 if (!is_open_locked()) {
497 }
498 if (full()) {
499 return Status::Unavailable();
500 }
501 PushAndWake(std::forward<U>(value));
502 return OkStatus();
503 }
504
505 template <typename U,
506 int&... kExplicitGuard,
507 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
508 std::is_constructible_v<T, U>,
509 bool> = true>
510 Status TrySend(U&& value) PW_LOCKS_EXCLUDED(*this) {
511 return TrySend(T(value));
512 }
513
514 Status TrySend() PW_LOCKS_EXCLUDED(*this) {
515 static_assert(
516 std::is_void_v<T>,
517 "TrySend() with no arguments is for notification channels only");
518
519 std::lock_guard guard(*this);
520 if (!is_open_locked()) {
522 }
523 if (full()) {
524 return Status::Unavailable();
525 }
526 PushAndWake();
527 return OkStatus();
528 }
529
530 std::conditional_t<std::is_void_v<T>, Status, Result<T>> TryReceive()
531 PW_LOCKS_EXCLUDED(*this) {
532 std::lock_guard guard(*this);
533 if (deque_.empty()) {
534 return is_open_locked() ? Status::Unavailable()
536 }
537 if constexpr (std::is_void_v<T>) {
538 PopAndWake();
539 return OkStatus();
540 } else {
541 return Result(PopAndWake());
542 }
543 }
544
545 Result<SendReservation<T>> TryReserveSend() PW_LOCKS_EXCLUDED(*this) {
546 std::lock_guard guard(*this);
547 if (!is_open_locked()) {
549 }
550 if (full()) {
551 return Status::Unavailable();
552 }
553 add_reservation();
554 return SendReservation<T>(*this);
555 }
556
557 template <typename... Args>
558 void CommitReservationAndRemoveRef(Args&&... args) PW_LOCKS_EXCLUDED(*this) {
559 lock();
560 remove_reservation();
561 if (is_open_locked()) {
562 EmplaceAndWake(std::forward<Args>(args)...);
563 }
564 RemoveRefAndDestroyIfUnreferenced();
565 }
566
567 void CommitNotificationReservationAndRemoveRef() PW_LOCKS_EXCLUDED(*this) {
568 lock();
569 remove_reservation();
570 if (is_open_locked()) {
571 PushAndWake();
572 }
573 RemoveRefAndDestroyIfUnreferenced();
574 }
575
576 protected:
577 constexpr explicit Channel(ChannelDeque<T>&& deque)
578 : deque_(std::move(deque)) {}
579
580 template <size_t kAlignment, size_t kCapacity>
582 : deque_(storage) {}
583
584 ~Channel() = default;
585
586 Deallocator* deallocator() const PW_NO_LOCK_SAFETY_ANALYSIS {
587 // SAFETY: deque_.deallocator() cannot change.
588 return deque_.deallocator();
589 }
590
591 private:
592 ChannelDeque<T> deque_ PW_GUARDED_BY(*this);
593};
594
595template <typename T>
596class DynamicChannel final : public Channel<T> {
597 public:
598 static Channel<T>* Allocate(Allocator& alloc, uint16_t capacity) {
599 ChannelDeque<T> deque = ChannelDeque<T>::TryAllocate(alloc, capacity);
600 if (deque.capacity() == 0) {
601 return nullptr;
602 }
603 return alloc.New<DynamicChannel<T>>(std::move(deque));
604 }
605
606 explicit DynamicChannel(ChannelDeque<T>&& deque)
607 : Channel<T>(std::move(deque)) {}
608
609 private:
610 ~DynamicChannel() = default;
611
612 void Destroy() final PW_LOCKS_EXCLUDED(*this) {
613 Deallocator* const deallocator = this->deallocator();
614 this->~DynamicChannel();
615 deallocator->Deallocate(this);
616 }
617};
618
619template <typename T, uint16_t kCapacity>
621 public Channel<T> {
622 public:
623 BaseChannelStorage() : Channel<T>(this->storage()) {}
624
625 protected:
626 ~BaseChannelStorage() = default;
627};
628
629template <uint16_t kCapacity>
630class BaseChannelStorage<void, kCapacity> : public Channel<void> {
631 public:
633
634 protected:
635 ~BaseChannelStorage() = default;
636};
637
644 public:
645 constexpr BaseChannelHandle() : channel_(nullptr) {}
646
648
650
651 [[nodiscard]] bool is_open() const PW_LOCKS_EXCLUDED(channel_) {
652 return channel_ != nullptr && channel_->is_open();
653 }
654
657 void Close() PW_LOCKS_EXCLUDED(channel_);
658
665 void Release() PW_LOCKS_EXCLUDED(channel_);
666
667 protected:
668 explicit BaseChannelHandle(BaseChannel& channel)
670 : channel_(&channel) {
671 channel_->add_handle();
672 }
673
674 BaseChannelHandle& operator=(const BaseChannelHandle& other)
675 PW_LOCKS_EXCLUDED(channel_);
676
677 BaseChannelHandle(BaseChannelHandle&& other) noexcept
678 : channel_(std::exchange(other.channel_, nullptr)) {}
679
680 BaseChannelHandle& operator=(BaseChannelHandle&& other) noexcept
681 PW_LOCKS_EXCLUDED(channel_);
682
683 constexpr BaseChannel* channel() const PW_LOCK_RETURNED(channel_) {
684 return channel_;
685 }
686
687 private:
688 BaseChannel* channel_;
689};
690
691} // namespace internal
692
694
696template <typename T>
698 public:
699 constexpr ChannelHandle() = default;
700
701 ChannelHandle(const ChannelHandle&) = default;
702 ChannelHandle& operator=(const ChannelHandle&) = default;
703
704 ChannelHandle(ChannelHandle&&) = default;
705 ChannelHandle& operator=(ChannelHandle&&) = default;
706
707 protected:
708 explicit ChannelHandle(internal::Channel<T>& channel)
710 : internal::BaseChannelHandle(channel) {}
711
715 PW_ASSERT(channel() != nullptr);
716 return static_cast<internal::Channel<T>&>(*channel()).CreateSender();
717 }
718
722 PW_ASSERT(channel() != nullptr);
723 return static_cast<internal::Channel<T>&>(*channel()).CreateReceiver();
724 }
725};
726
728template <typename T>
729class MpmcChannelHandle final : public ChannelHandle<T> {
730 public:
731 constexpr MpmcChannelHandle() = default;
732
735
736 private:
737 explicit MpmcChannelHandle(internal::Channel<T>& channel)
738 : ChannelHandle<T>(channel) {}
739
740 template <typename U>
741 friend std::optional<MpmcChannelHandle<U>> CreateMpmcChannel(Allocator&,
742 uint16_t);
743
744 template <typename U, uint16_t kCapacity>
747};
748
750template <typename T>
751class MpscChannelHandle final : public ChannelHandle<T> {
752 public:
753 constexpr MpscChannelHandle() = default;
754
756
757 private:
758 explicit MpscChannelHandle(internal::Channel<T>& channel)
759 : ChannelHandle<T>(channel) {}
760
761 template <typename U>
762 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
763 CreateMpscChannel(Allocator&, uint16_t);
764
765 template <typename U, uint16_t kCapacity>
766 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
768};
769
771template <typename T>
772class SpmcChannelHandle final : public ChannelHandle<T> {
773 public:
774 constexpr SpmcChannelHandle() = default;
775
777
778 private:
779 explicit SpmcChannelHandle(internal::Channel<T>& channel)
780 : ChannelHandle<T>(channel) {}
781
782 template <typename U>
783 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
784 CreateSpmcChannel(Allocator&, uint16_t);
785
786 template <typename U, uint16_t kCapacity>
787 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
789};
790
792template <typename T>
793class SpscChannelHandle final : public ChannelHandle<T> {
794 public:
795 constexpr SpscChannelHandle() = default;
796
797 private:
798 explicit SpscChannelHandle(internal::Channel<T>& channel)
799 : ChannelHandle<T>(channel) {}
800
801 template <typename U>
802 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
803 CreateSpscChannel(Allocator&, uint16_t);
804
805 template <typename U, uint16_t kCapacity>
806 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
808};
809
813template <typename T>
814class MpChannelHandle final : public ChannelHandle<T> {
815 public:
816 constexpr MpChannelHandle() = default;
817
819 : ChannelHandle<T>(other) {}
820
821 MpChannelHandle& operator=(const MpmcChannelHandle<T>& other) {
823 return *this;
824 }
825
827 : ChannelHandle<T>(std::move(other)) {}
828
829 MpChannelHandle& operator=(MpmcChannelHandle<T>&& other) {
830 ChannelHandle<T>::operator=(std::move(other));
831 return *this;
832 }
833
835 : ChannelHandle<T>(other) {}
836
837 MpChannelHandle& operator=(const MpscChannelHandle<T>& other) {
839 return *this;
840 }
841
843 : ChannelHandle<T>(std::move(other)) {}
844
845 MpChannelHandle& operator=(MpscChannelHandle<T>&& other) {
846 ChannelHandle<T>::operator=(std::move(other));
847 return *this;
848 }
849
851};
852
856template <typename T>
857class McChannelHandle final : public ChannelHandle<T> {
858 public:
859 constexpr McChannelHandle() = default;
860
862 : ChannelHandle<T>(other) {}
863
864 McChannelHandle& operator=(const MpmcChannelHandle<T>& other) {
866 return *this;
867 }
868
870 : ChannelHandle<T>(std::move(other)) {}
871
872 McChannelHandle& operator=(MpmcChannelHandle<T>&& other) {
873 ChannelHandle<T>::operator=(std::move(other));
874 return *this;
875 }
876
878 : ChannelHandle<T>(other) {}
879
880 McChannelHandle& operator=(const SpmcChannelHandle<T>& other) {
882 return *this;
883 }
884
886 : ChannelHandle<T>(std::move(other)) {}
887
888 McChannelHandle& operator=(SpmcChannelHandle<T>&& other) {
889 ChannelHandle<T>::operator=(std::move(other));
890 return *this;
891 }
892
894};
895
900template <typename T, uint16_t kCapacity>
901class ChannelStorage final
902 : private internal::BaseChannelStorage<T, kCapacity> {
903 public:
904 template <typename U, uint16_t kCap>
905 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
906 CreateSpscChannel(ChannelStorage<U, kCap>& storage);
907
908 template <typename U, uint16_t kCap>
909 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
910 ChannelStorage<U, kCap>& storage);
911
912 template <typename U, uint16_t kCap>
913 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
914 ChannelStorage<U, kCap>& storage);
915
916 template <typename U, uint16_t kCap>
917 friend MpmcChannelHandle<U> CreateMpmcChannel(
918 ChannelStorage<U, kCap>& storage);
919
920 ~ChannelStorage() = default;
921
924 [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(*this) {
925 std::lock_guard lock(*this);
926 return this->active_locked();
927 }
928
929 constexpr uint16_t capacity() const { return kCapacity; }
930};
931
932template <typename T>
933class [[nodiscard]] ReceiveFuture final
934 : public internal::ChannelFuture<ReceiveFuture<T>, T, std::optional<T>> {
935 private:
937
938 public:
939 constexpr ReceiveFuture() = default;
940
942 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel())
943 : Base(std::move(other)) {}
944
945 ReceiveFuture& operator=(ReceiveFuture&& other)
946 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
947 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
948 return static_cast<ReceiveFuture&>(this->MoveAssignFrom(other));
949 }
950
951 ~ReceiveFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
952 this->RemoveFromChannel();
953 }
954
955 private:
956 friend Base;
957 // Suppress `no uniquely matching class member found` Doxygen error
961 friend Receiver<T>;
962 template <typename, typename>
963 friend class CallbackTask;
964
965 explicit ReceiveFuture(internal::Channel<T>* channel)
966 PW_LOCKS_EXCLUDED(*channel)
967 : Base(channel, this->kAllowClosed) {}
968
969 PollOptional<T> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
970 if (this->channel() == nullptr) {
971 PW_DASSERT(this->is_pendable());
972 return Ready<std::optional<T>>(std::nullopt);
973 }
974
975 this->channel()->lock();
976 PW_DASSERT(this->is_pendable());
977 if (this->channel()->empty()) {
978 return this->StoreWakerForReceiveIfOpen(cx)
979 ? Pending()
980 : Ready<std::optional<T>>(std::nullopt);
981 }
982
983 auto result = Ready(this->channel()->PopAndWake());
984 this->Complete();
985 return result;
986 }
987};
988
989template <>
990class [[nodiscard]] ReceiveFuture<void> final
991 : public internal::ChannelFuture<ReceiveFuture<void>, void, bool> {
992 private:
994
995 public:
996 constexpr ReceiveFuture() = default;
997
999 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel())
1000 : Base(std::move(other)) {}
1001
1002 ReceiveFuture& operator=(ReceiveFuture&& other)
1003 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
1004 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1005 return static_cast<ReceiveFuture&>(this->MoveAssignFrom(other));
1006 }
1007
1008 ~ReceiveFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1009 this->RemoveFromChannel();
1010 }
1011
1012 private:
1013 friend Base;
1014 // Suppress `no uniquely matching class member found` Doxygen error
1018 friend Receiver<void>;
1019 template <typename, typename>
1020 friend class CallbackTask;
1021
1022 explicit ReceiveFuture(internal::Channel<void>* channel)
1023 PW_LOCKS_EXCLUDED(*channel)
1024 : Base(channel, this->kAllowClosed) {}
1025
1026 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
1027 if (this->channel() == nullptr) {
1028 PW_DASSERT(this->is_pendable());
1029 return Ready(false);
1030 }
1031
1032 this->channel()->lock();
1033 PW_DASSERT(this->is_pendable());
1034 if (this->channel()->empty()) {
1035 return this->StoreWakerForReceiveIfOpen(cx) ? Pending() : Ready(false);
1036 }
1037
1038 this->channel()->PopAndWake();
1039 this->Complete();
1040 return Ready(true);
1041 }
1042};
1043
1045template <typename T>
1047 public:
1048 constexpr Receiver() : channel_(nullptr) {}
1049
1050 Receiver(const Receiver& other) = delete;
1051 Receiver& operator=(const Receiver& other) = delete;
1052
1053 Receiver(Receiver&& other) noexcept
1054 : channel_(std::exchange(other.channel_, nullptr)) {}
1055
1056 Receiver& operator=(Receiver&& other) noexcept {
1057 if (this == &other) {
1058 return *this;
1059 }
1060 if (channel_ != nullptr) {
1061 channel_->remove_receiver();
1062 }
1063 channel_ = std::exchange(other.channel_, nullptr);
1064 return *this;
1065 }
1066
1067 ~Receiver() {
1068 if (channel_ != nullptr) {
1069 channel_->remove_receiver();
1070 }
1071 }
1072
1082 std::conditional_t<std::is_void_v<T>, Status, Result<T>> BlockingReceive(
1083 Dispatcher& dispatcher,
1086 if (channel_ == nullptr) {
1088 }
1089
1090 // Return immediately if a value is available or the channel is closed.
1091 if constexpr (std::is_void_v<T>) {
1092 if (Status result = channel_->TryReceive();
1093 result.ok() || result.IsFailedPrecondition()) {
1094 return result;
1095 }
1096 } else {
1097 if (Result<T> result = channel_->TryReceive();
1098 result.ok() || result.status().IsFailedPrecondition()) {
1099 return result;
1100 }
1101 }
1102
1103 using ReceiveType =
1104 std::conditional_t<std::is_void_v<T>, bool, std::optional<T>>;
1105
1106 ReceiveType result;
1107 sync::TimedThreadNotification notification;
1108
1110 [&result, &notification](ReceiveType&& val) {
1111 result = std::move(val);
1112 notification.release();
1113 },
1114 channel_);
1115 dispatcher.Post(task);
1116
1117 if (timeout == internal::Channel<T>::kWaitForever) {
1118 notification.acquire();
1119 if constexpr (std::is_void_v<T>) {
1120 return result ? OkStatus() : Status::FailedPrecondition();
1121 } else {
1122 if (!result.has_value()) {
1124 }
1125 return Result<T>(std::move(*result));
1126 }
1127 }
1128
1129 if (!notification.try_acquire_for(timeout)) {
1130 return Status::DeadlineExceeded();
1131 }
1132
1133 if constexpr (std::is_void_v<T>) {
1134 return result ? OkStatus() : Status::FailedPrecondition();
1135 } else {
1136 if (!result.has_value()) {
1138 }
1139 return Result<T>(std::move(*result));
1140 }
1141 }
1142
1151 return ReceiveFuture<T>(channel_);
1152 }
1153
1160 std::conditional_t<std::is_void_v<T>, Status, Result<T>> TryReceive() {
1161 if (channel_ == nullptr) {
1163 }
1164
1165 return channel_->TryReceive();
1166 }
1167
1172 void Disconnect() {
1173 if (channel_ != nullptr) {
1174 channel_->remove_receiver();
1175 channel_ = nullptr;
1176 }
1177 }
1178
1180 [[nodiscard]] bool is_open() const {
1181 return channel_ != nullptr && channel_->is_open();
1182 }
1183
1184 private:
1185 template <typename U>
1186 friend class internal::Channel;
1187
1188 template <typename U>
1189 friend std::optional<std::tuple<MpscChannelHandle<U>, Receiver<U>>>
1190 CreateMpscChannel(Allocator&, uint16_t);
1191
1192 template <typename U, uint16_t kCapacity>
1193 friend std::tuple<MpscChannelHandle<U>, Receiver<U>> CreateMpscChannel(
1195
1196 template <typename U>
1197 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
1198 CreateSpscChannel(Allocator&, uint16_t);
1199
1200 template <typename U, uint16_t kCapacity>
1201 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
1203
1204 explicit Receiver(internal::Channel<T>& channel)
1206 : channel_(&channel) {
1207 channel_->add_receiver();
1208 }
1209
1210 internal::Channel<T>* channel_;
1211};
1212
1213template <typename T>
1214class [[nodiscard]] SendFuture final
1215 : public internal::ChannelFuture<SendFuture<T>, T, bool> {
1216 private:
1218
1219 public:
1220 constexpr SendFuture() = default;
1221
1222 SendFuture(SendFuture&& other) PW_LOCKS_EXCLUDED(*other.channel())
1223 : Base(static_cast<Base&&>(other)), value_(std::move(other.value_)) {}
1224
1225 SendFuture& operator=(SendFuture&& other)
1226 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
1227 value_ = std::move(other.value_);
1228 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1229 return static_cast<SendFuture&>(this->MoveAssignFrom(other));
1230 }
1231
1232 ~SendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1233 this->RemoveFromChannel();
1234 }
1235
1236 private:
1237 friend Base;
1238 // Suppress `no uniquely matching class member found` Doxygen error
1240 friend internal::Channel<T>;
1242 friend Sender<T>;
1243
1244 SendFuture(internal::Channel<T>* channel, const T& value)
1245 PW_LOCKS_EXCLUDED(*channel)
1246 : Base(channel), value_(value) {}
1247
1248 SendFuture(internal::Channel<T>* channel, T&& value)
1249 PW_LOCKS_EXCLUDED(*channel)
1250 : Base(channel), value_(std::move(value)) {}
1251
1252 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
1253 if (this->channel() == nullptr) {
1254 PW_DASSERT(this->is_pendable());
1255 return Ready(false);
1256 }
1257
1258 this->channel()->lock();
1259 PW_DASSERT(this->is_pendable());
1260 if (!this->channel()->is_open_locked()) {
1261 this->Complete();
1262 return Ready(false);
1263 }
1264
1265 if (this->channel()->full()) {
1266 this->StoreWakerForSend(cx);
1267 return Pending();
1268 }
1269
1270 this->channel()->PushAndWake(std::move(*value_));
1271 this->Complete();
1272 return Ready(true);
1273 }
1274
1275 std::optional<T> value_;
1276};
1277
1278template <>
1279class [[nodiscard]] SendFuture<void> final
1280 : public internal::ChannelFuture<SendFuture<void>, void, bool> {
1281 private:
1283
1284 public:
1285 constexpr SendFuture() = default;
1286
1287 SendFuture(SendFuture&& other) PW_LOCKS_EXCLUDED(*other.channel())
1288 : Base(static_cast<Base&&>(other)) {}
1289
1290 SendFuture& operator=(SendFuture&& other)
1291 PW_LOCKS_EXCLUDED(*this->channel(), *other.channel()) {
1292 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1293 return static_cast<SendFuture&>(this->MoveAssignFrom(other));
1294 }
1295
1296 ~SendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1297 this->RemoveFromChannel();
1298 }
1299
1300 private:
1301 friend Base;
1302 // Suppress `no uniquely matching class member found` Doxygen error
1306 friend Sender<void>;
1307
1308 explicit SendFuture(internal::Channel<void>* channel)
1309 PW_LOCKS_EXCLUDED(*channel)
1310 : Base(channel) {}
1311
1312 Poll<bool> DoPend(Context& cx) PW_LOCKS_EXCLUDED(*this->channel()) {
1313 if (this->channel() == nullptr) {
1314 PW_DASSERT(this->is_pendable());
1315 return Ready(false);
1316 }
1317
1318 this->channel()->lock();
1319 PW_DASSERT(this->is_pendable());
1320 if (!this->channel()->is_open_locked()) {
1321 this->Complete();
1322 return Ready(false);
1323 }
1324
1325 if (this->channel()->full()) {
1326 this->StoreWakerForSend(cx);
1327 return Pending();
1328 }
1329
1330 this->channel()->PushAndWake();
1331 this->Complete();
1332 return Ready(true);
1333 }
1334};
1335
1342template <typename T>
1344 public:
1345 SendReservation(const SendReservation& other) = delete;
1346 SendReservation& operator=(const SendReservation& other) = delete;
1347
1349 : channel_(std::exchange(other.channel_, nullptr)) {}
1350
1351 SendReservation& operator=(SendReservation&& other) {
1352 if (this == &other) {
1353 return *this;
1354 }
1355 Cancel();
1356 channel_ = std::exchange(other.channel_, nullptr);
1357 return *this;
1358 }
1359
1360 ~SendReservation() { Cancel(); }
1361
1363 template <typename... Args>
1364 void Commit(Args&&... args) {
1365 PW_ASSERT(channel_ != nullptr);
1366 channel_->CommitReservationAndRemoveRef(std::forward<Args>(args)...);
1367 channel_ = nullptr;
1368 }
1369
1370 void CommitNotification() {
1371 static_assert(std::is_void_v<T>,
1372 "CommitNotification() is for notification channels only.");
1373 PW_ASSERT(channel_ != nullptr);
1374 channel_->CommitNotificationReservationAndRemoveRef();
1375 channel_ = nullptr;
1376 }
1377
1379 void Cancel() {
1380 if (channel_ != nullptr) {
1381 channel_->DropReservationAndRemoveRef();
1382 channel_ = nullptr;
1383 }
1384 }
1385
1386 private:
1387 // Suppress `no uniquely matching class member found` Doxygen error
1389 friend internal::Channel<T>;
1391 friend class ReserveSendFuture<T>;
1392 friend class Sender<T>;
1393
1394 explicit SendReservation(internal::Channel<T>& channel)
1396 : channel_(&channel) {
1397 channel_->add_ref();
1398 }
1399
1400 internal::Channel<T>* channel_;
1401};
1402
1403template <typename T>
1404class [[nodiscard]] ReserveSendFuture final
1405 : public internal::ChannelFuture<ReserveSendFuture<T>,
1406 T,
1407 std::optional<SendReservation<T>>> {
1408 private:
1409 using Base = internal::
1410 ChannelFuture<ReserveSendFuture, T, std::optional<SendReservation<T>>>;
1411
1412 public:
1413 constexpr ReserveSendFuture() = default;
1414
1415 ReserveSendFuture(ReserveSendFuture&& other) : Base(std::move(other)) {}
1416
1417 ReserveSendFuture& operator=(ReserveSendFuture&& other) {
1418 // NOLINTNEXTLINE(misc-unconventional-assign-operator)
1419 return static_cast<ReserveSendFuture&>(this->MoveAssignFrom(other));
1420 }
1421
1422 ~ReserveSendFuture() PW_LOCKS_EXCLUDED(*this->channel()) {
1423 this->RemoveFromChannel();
1424 }
1425
1426 private:
1427 friend Base;
1428 // Suppress `no uniquely matching class member found` Doxygen error
1430 friend internal::Channel<T>;
1432 friend Sender<T>;
1433
1434 explicit ReserveSendFuture(internal::Channel<T>* channel)
1435 PW_LOCKS_EXCLUDED(*channel)
1436 : Base(channel) {}
1437
1439 PW_LOCKS_EXCLUDED(*this->channel()) {
1440 if (this->channel() == nullptr) {
1441 PW_DASSERT(this->is_pendable());
1442 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1443 }
1444
1445 this->channel()->lock();
1446 PW_DASSERT(this->is_pendable());
1447 if (!this->channel()->is_open_locked()) {
1448 this->Complete();
1449 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1450 }
1451
1452 if (this->channel()->remaining_capacity_locked() == 0) {
1453 this->StoreWakerForReserveSend(cx);
1454 return Pending();
1455 }
1456
1457 this->channel()->add_reservation();
1458 SendReservation<T> reservation(*this->channel());
1459 this->Complete();
1460 return reservation;
1461 }
1462};
1463
1465template <typename T>
1466class Sender {
1467 public:
1468 constexpr Sender() : channel_(nullptr) {}
1469
1470 Sender(const Sender& other) = delete;
1471 Sender& operator=(const Sender& other) = delete;
1472
1473 Sender(Sender&& other) noexcept
1474 : channel_(std::exchange(other.channel_, nullptr)) {}
1475
1476 Sender& operator=(Sender&& other) noexcept {
1477 if (this == &other) {
1478 return *this;
1479 }
1480 if (channel_ != nullptr) {
1481 channel_->remove_sender();
1482 }
1483 channel_ = std::exchange(other.channel_, nullptr);
1484 return *this;
1485 }
1486
1487 ~Sender() {
1488 if (channel_ != nullptr) {
1489 channel_->remove_sender();
1490 }
1491 }
1492
1501 template <typename U>
1502 SendFuture<T> Send(U&& value) {
1503 return SendFuture<T>(channel_, std::forward<U>(value));
1504 }
1505
1506 SendFuture<T> Send() {
1507 static_assert(std::is_void_v<T>,
1508 "Send() with no arguments is for notification channels only");
1509 return SendFuture<T>(channel_);
1510 }
1511
1518
1529 if (channel_ == nullptr) {
1531 }
1532 return channel_->TryReserveSend();
1533 }
1534
1544 template <typename U,
1545 int&... kExplicitGuard,
1546 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
1547 bool> = true>
1548 Status TrySend(U&& value) {
1549 if (channel_ == nullptr) {
1551 }
1552 return channel_->TrySend(std::forward<U>(value));
1553 }
1554
1556 template <typename U,
1557 int&... kExplicitGuard,
1558 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
1559 std::is_constructible_v<T, U>,
1560 bool> = true>
1561 Status TrySend(U&& value) {
1562 return TrySend(T(value));
1563 }
1564
1576 static_assert(
1577 std::is_void_v<T>,
1578 "TrySend() with no arguments is for notification channels only");
1579 if (channel_ == nullptr) {
1581 }
1582 return channel_->TrySend();
1583 }
1584
1595 template <typename U,
1596 int&... kExplicitGuard,
1597 std::enable_if_t<std::is_same_v<::cpp20::remove_cvref_t<U>, T>,
1598 bool> = true>
1600 U&& value,
1603 return BlockingSendMoveOrCopy(dispatcher, std::forward<U>(value), timeout);
1604 }
1605
1607 template <typename U,
1608 int&... kExplicitGuard,
1609 std::enable_if_t<!std::is_same_v<::cpp20::remove_cvref_t<U>, T> &&
1610 std::is_constructible_v<T, U>,
1611 bool> = true>
1613 U&& value,
1616 return BlockingSendMoveOrCopy(dispatcher, T(value), timeout);
1617 }
1618
1632 return BlockingSendMoveOrCopy(dispatcher, timeout);
1633 }
1634
1639 void Disconnect() {
1640 if (channel_ != nullptr) {
1641 channel_->remove_sender();
1642 channel_ = nullptr;
1643 }
1644 }
1645
1647 uint16_t remaining_capacity() const {
1648 return channel_ != nullptr ? channel_->remaining_capacity() : 0;
1649 }
1650
1652 uint16_t capacity() const {
1653 return channel_ != nullptr ? channel_->capacity() : 0;
1654 }
1655
1657 [[nodiscard]] bool is_open() const {
1658 return channel_ != nullptr && channel_->is_open();
1659 }
1660
1661 private:
1662 template <typename U>
1663 friend class internal::Channel;
1664
1665 template <typename U>
1666 friend std::optional<std::tuple<SpmcChannelHandle<U>, Sender<U>>>
1667 CreateSpmcChannel(Allocator&, uint16_t);
1668
1669 template <typename U, uint16_t kCapacity>
1670 friend std::tuple<SpmcChannelHandle<U>, Sender<U>> CreateSpmcChannel(
1672
1673 template <typename U>
1674 friend std::optional<std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>>
1675 CreateSpscChannel(Allocator&, uint16_t);
1676
1677 template <typename U, uint16_t kCapacity>
1678 friend std::tuple<SpscChannelHandle<U>, Sender<U>, Receiver<U>>
1680
1681 explicit Sender(internal::Channel<T>& channel)
1683 : channel_(&channel) {
1684 channel_->add_sender();
1685 }
1686
1687 template <typename U>
1688 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1689 U&& value,
1691 PW_LOCKS_EXCLUDED(*channel_) {
1692 if (channel_ == nullptr) {
1694 }
1695
1696 if (Status status = channel_->TrySend(std::forward<U>(value));
1697 status.ok() || status.IsFailedPrecondition()) {
1698 return status;
1699 }
1700
1701 return BlockingSendFuture(
1702 dispatcher, // NOLINTNEXTLINE(bugprone-use-after-move)
1703 SendFuture<T>(channel_, std::forward<U>(value)),
1704 timeout);
1705 }
1706
1707 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1709 PW_LOCKS_EXCLUDED(*channel_) {
1710 if (channel_ == nullptr) {
1712 }
1713
1714 if (Status status = channel_->TrySend();
1715 status.ok() || status.IsFailedPrecondition()) {
1716 return status;
1717 }
1718
1719 return BlockingSendFuture(dispatcher, SendFuture<T>(channel_), timeout);
1720 }
1721
1722 Status BlockingSendFuture(Dispatcher& dispatcher,
1723 SendFuture<T>&& future,
1725 PW_LOCKS_EXCLUDED(*channel_) {
1726 Status status;
1727 sync::TimedThreadNotification notification;
1728
1729 CallbackTask task(
1730 [&status, &notification](bool result) {
1731 status = result ? OkStatus() : Status::FailedPrecondition();
1732 notification.release();
1733 },
1734 std::move(future));
1735 dispatcher.Post(task);
1736
1737 if (timeout == internal::Channel<T>::kWaitForever) {
1738 notification.acquire();
1739 return status;
1740 }
1741
1742 if (!notification.try_acquire_for(timeout)) {
1743 task.Deregister();
1744 return Status::DeadlineExceeded();
1745 }
1746 return status;
1747 }
1748
1749 internal::Channel<T>* channel_;
1750};
1751
1765template <typename T>
1766std::optional<MpmcChannelHandle<T>> CreateMpmcChannel(Allocator& alloc,
1767 uint16_t capacity) {
1768 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1769 if (channel == nullptr) {
1770 return std::nullopt;
1771 }
1772 std::lock_guard lock(*channel);
1773 return MpmcChannelHandle<T>(*channel);
1774}
1775
1787template <typename T, uint16_t kCapacity>
1789 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1790 PW_DASSERT(!storage.active_locked());
1791 return MpmcChannelHandle<T>(storage);
1792}
1793
1807template <typename T>
1808std::optional<std::tuple<MpscChannelHandle<T>, Receiver<T>>> CreateMpscChannel(
1809 Allocator& alloc, uint16_t capacity) {
1810 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1811 if (channel == nullptr) {
1812 return std::nullopt;
1813 }
1814 std::lock_guard lock(*channel);
1815 return std::make_tuple(MpscChannelHandle<T>(*channel), Receiver<T>(*channel));
1816}
1817
1829template <typename T, uint16_t kCapacity>
1830std::tuple<MpscChannelHandle<T>, Receiver<T>> CreateMpscChannel(
1832 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1833 PW_DASSERT(!storage.active_locked());
1834 return std::make_tuple(MpscChannelHandle<T>(storage), Receiver<T>(storage));
1835}
1836
1850template <typename T>
1851std::optional<std::tuple<SpmcChannelHandle<T>, Sender<T>>> CreateSpmcChannel(
1852 Allocator& alloc, uint16_t capacity) {
1853 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1854 if (channel == nullptr) {
1855 return std::nullopt;
1856 }
1857 std::lock_guard lock(*channel);
1858 return std::make_tuple(SpmcChannelHandle<T>(*channel), Sender<T>(*channel));
1859}
1860
1872template <typename T, uint16_t kCapacity>
1873std::tuple<SpmcChannelHandle<T>, Sender<T>> CreateSpmcChannel(
1875 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1876 PW_DASSERT(!storage.active_locked());
1877 return std::make_tuple(SpmcChannelHandle<T>(storage), Sender<T>(storage));
1878}
1879
1893template <typename T>
1894std::optional<std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>>>
1895CreateSpscChannel(Allocator& alloc, uint16_t capacity) {
1896 auto channel = internal::DynamicChannel<T>::Allocate(alloc, capacity);
1897 if (channel == nullptr) {
1898 return std::nullopt;
1899 }
1900 std::lock_guard lock(*channel);
1901 return std::make_tuple(SpscChannelHandle<T>(*channel),
1902 Sender<T>(*channel),
1903 Receiver<T>(*channel));
1904}
1905
1917template <typename T, uint16_t kCapacity>
1918std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>> CreateSpscChannel(
1920 std::lock_guard lock(static_cast<internal::Channel<T>&>(storage));
1921 PW_DASSERT(!storage.active_locked());
1922 return std::make_tuple(
1923 SpscChannelHandle<T>(storage), Sender<T>(storage), Receiver<T>(storage));
1924}
1925
1927
1928namespace internal {
1929
1930inline void BaseChannelFuture::Complete() PW_UNLOCK_FUNCTION(*channel_) {
1931 channel_->RemoveRefAndDestroyIfUnreferenced();
1932 channel_ = nullptr;
1933}
1934
1935} // namespace internal
1936} // namespace pw::async2
Definition: allocator.h:42
T * New(Args &&... args)
Definition: allocator.h:66
Abstract interface for releasing memory.
Definition: deallocator.h:30
void Deallocate(void *ptr)
Definition: deallocator.h:316
Definition: deque.h:206
Definition: result.h:145
Definition: status.h:120
static constexpr Status DeadlineExceeded()
Definition: status.h:177
static constexpr Status Unavailable()
Definition: status.h:304
static constexpr Status FailedPrecondition()
Definition: status.h:243
Definition: callback_task.h:44
Channel handle for a particular type T.
Definition: channel.h:697
Sender< T > CreateSender()
Definition: channel.h:714
Receiver< T > CreateReceiver()
Definition: channel.h:721
Definition: channel.h:902
bool active() const
Definition: channel.h:924
Definition: task.h:47
Definition: future.h:408
Definition: dispatcher.h:74
constexpr bool is_pendable() const
Definition: future.h:255
void MarkComplete()
Definition: future.h:306
constexpr bool is_complete() const
Definition: future.h:260
Definition: future.h:114
Definition: channel.h:857
Definition: channel.h:814
A handle to a multi-producer, multi-consumer channel.
Definition: channel.h:729
friend std::optional< MpmcChannelHandle< U > > CreateMpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1766
A handle to a multi-producer, single-consumer channel.
Definition: channel.h:751
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1808
Definition: poll.h:138
constexpr bool IsReady() const noexcept
Returns whether or not this value is Ready.
Definition: poll.h:211
Definition: channel.h:934
A receiver which reads values from an asynchronous channel.
Definition: channel.h:1046
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1808
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1180
std::conditional_t< std::is_void_v< T >, Status, Result< T > > TryReceive()
Definition: channel.h:1160
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1895
void Disconnect()
Definition: channel.h:1172
std::conditional_t< std::is_void_v< T >, Status, Result< T > > BlockingReceive(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1082
ReceiveFuture< T > Receive()
Definition: channel.h:1150
Definition: channel.h:1407
Definition: channel.h:1215
Definition: channel.h:1343
void Cancel()
Releases the reservation, making the space available for other senders.
Definition: channel.h:1379
void Commit(Args &&... args)
Commits a value to a reserved slot.
Definition: channel.h:1364
A sender which writes values to an asynchronous channel.
Definition: channel.h:1466
SendFuture< T > Send(U &&value)
Definition: channel.h:1502
Status TrySend(U &&value)
Definition: channel.h:1548
Result< SendReservation< T > > TryReserveSend()
Definition: channel.h:1528
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1851
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1895
ReserveSendFuture< T > ReserveSend()
Definition: channel.h:1517
uint16_t capacity() const
Returns the maximum capacity of the channel.
Definition: channel.h:1652
Status BlockingSend(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1629
Status BlockingSend(Dispatcher &dispatcher, U &&value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1599
uint16_t remaining_capacity() const
Returns the remaining capacity of the channel.
Definition: channel.h:1647
void Disconnect()
Definition: channel.h:1639
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1657
Status TrySend()
Definition: channel.h:1575
A handle to a single-producer, multi-consumer channel.
Definition: channel.h:772
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1851
A handle to a single-producer, single-consumer channel.
Definition: channel.h:793
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1895
bool is_complete() const
True if the future has returned Ready().
Definition: channel.h:75
bool is_pendable() const
True if the Pend can be called on the future.
Definition: channel.h:72
Definition: channel.h:167
Definition: channel.h:317
Definition: channel.h:132
Definition: channel.h:402
Definition: channel.h:596
Definition: storage.h:86
Definition: storage.h:39
Definition: interrupt_spin_lock.h:50
Definition: timed_thread_notification.h:40
bool try_acquire_for(chrono::SystemClock::duration timeout)
std::optional< std::tuple< SpmcChannelHandle< T >, Sender< T > > > CreateSpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1851
std::optional< std::tuple< MpscChannelHandle< T >, Receiver< T > > > CreateMpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1808
std::optional< MpmcChannelHandle< T > > CreateMpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1766
std::optional< std::tuple< SpscChannelHandle< T >, Sender< T >, Receiver< T > > > CreateSpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1895
std::conditional_t< std::is_void_v< typename T::value_type >, ReadyType, typename T::value_type > FutureValue
Definition: future.h:110
constexpr PendingType Pending()
Returns a value indicating that an operation was not yet able to complete.
Definition: poll.h:353
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:337
std::chrono::duration< rep, period > duration
Alias for durations representable with this clock.
Definition: system_clock.h:91
constexpr bool CheckedIncrement(T &base, Inc inc)
Definition: checked_arithmetic.h:118
constexpr Status OkStatus()
Definition: status.h:450
#define PW_LOCKABLE(name)
Definition: lock_annotations.h:210
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_NO_LOCK_SAFETY_ANALYSIS
Definition: lock_annotations.h:296
#define PW_EXCLUSIVE_LOCK_FUNCTION(...)
Definition: lock_annotations.h:232
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:147
#define PW_LOCK_RETURNED(x)
Definition: lock_annotations.h:199
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:249
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:178