18#include "pw_allocator/allocator.h"
19#include "pw_async2/callback_task.h"
20#include "pw_async2/dispatcher.h"
21#include "pw_async2/future.h"
22#include "pw_containers/deque.h"
23#include "pw_numeric/checked_arithmetic.h"
24#include "pw_result/result.h"
25#include "pw_sync/interrupt_spin_lock.h"
26#include "pw_sync/lock_annotations.h"
27#include "pw_sync/timed_thread_notification.h"
44class ReserveSendFuture;
49template <
typename T, u
int16_t kCapacity>
68 ~Channel() { PW_ASSERT(ref_count_ == 0); }
73 std::lock_guard lock(lock_);
80 template <
size_t kAlignment,
size_t kCapacity>
85 std::lock_guard lock(lock_);
91 friend ChannelHandle<T>;
93 friend ReserveSendFuture<T>;
94 friend ReceiveFuture<T>;
96 friend SendReservation<T>;
100 Deallocator* deallocator =
nullptr;
102 std::lock_guard lock(lock_);
103 deallocator = deque_.deallocator();
106 if (deallocator !=
nullptr) {
107 std::destroy_at(
this);
108 deallocator->Deallocate(
this);
113 std::lock_guard lock(lock_);
119 while (
auto send_future = send_futures_.Pop()) {
120 send_future->get().Wake();
122 while (
auto reserve_send_future = reserve_send_futures_.Pop()) {
123 reserve_send_future->get().Wake();
125 while (
auto receive_future = receive_futures_.Pop()) {
126 receive_future->get().Wake();
133 return Sender<T>(
nullptr);
135 return Sender<T>(
this);
141 return Receiver<T>(
nullptr);
143 return Receiver<T>(
this);
147 deque_.push_back(std::move(value));
152 deque_.push_back(value);
156 template <
typename... Args>
158 deque_.emplace_back(std::forward<Args>(args)...);
163 if (
auto future = receive_futures_.Pop()) {
164 future->get().Wake();
169 PW_ASSERT(!deque_.empty());
171 T value = std::move(deque_.front());
180 if (prioritize_reserve_) {
181 if (
auto reserve_future = reserve_send_futures_.Pop()) {
182 reserve_future->get().Wake();
183 }
else if (
auto send_future = send_futures_.Pop()) {
184 send_future->get().Wake();
187 if (
auto send_future = send_futures_.Pop()) {
188 send_future->get().Wake();
189 }
else if (
auto reserve_future = reserve_send_futures_.Pop()) {
190 reserve_future->get().Wake();
194 prioritize_reserve_ = !prioritize_reserve_;
199 return remaining_capacity_locked() == 0;
203 std::lock_guard lock(lock_);
204 return remaining_capacity_locked();
206 uint16_t remaining_capacity_locked() const
208 return deque_.capacity() - deque_.size() - reservations_;
213 return deque_.capacity();
217 std::lock_guard lock(lock_);
218 return deque_.empty();
228 PushAndWake(std::move(value));
232 std::lock_guard lock(lock_);
233 if (closed_ || remaining_capacity_locked() == 0) {
241 std::lock_guard lock(lock_);
242 if (closed_ || remaining_capacity_locked() == 0) {
245 PushAndWake(std::move(value));
250 std::lock_guard lock(lock_);
251 if (deque_.empty()) {
258 std::lock_guard lock(lock_);
259 if (closed_ || remaining_capacity_locked() == 0) {
267 std::lock_guard lock(lock_);
268 PW_ASSERT(reservations_ > 0);
275 template <
typename... Args>
277 std::lock_guard lock(lock_);
278 PW_ASSERT(reservations_ > 0);
281 EmplaceAndWake(std::forward<Args>(args)...);
286 std::lock_guard lock(lock_);
287 add_object(receiver_count_);
291 std::lock_guard lock(lock_);
292 add_object(sender_count_);
296 std::lock_guard lock(lock_);
297 add_object(handle_count_);
304 PW_ASSERT(
CheckedAdd(ref_count_, 1, ref_count_));
308 std::lock_guard lock(lock_);
309 remove_object(sender_count_);
313 std::lock_guard lock(lock_);
314 remove_object(receiver_count_);
318 std::lock_guard lock(lock_);
319 remove_object(handle_count_);
324 PW_ASSERT(counter > 0);
326 if (should_close()) {
330 bool destroy = decrement_ref_locked();
339 std::lock_guard lock(lock_);
346 std::lock_guard lock(lock_);
347 destroy = decrement_ref_locked();
356 return ref_count_ == 0;
366 if (handle_count_ > 0) {
369 return sender_count_ == 0 || receiver_count_ == 0;
373 ListFutureProvider<SendFuture<T>> send_futures_;
374 ListFutureProvider<ReserveSendFuture<T>> reserve_send_futures_;
375 ListFutureProvider<ReceiveFuture<T>> receive_futures_;
377 mutable sync::InterruptSpinLock lock_;
409 if (channel_ !=
nullptr) {
410 channel_->add_handle();
415 if (channel_ !=
nullptr) {
416 channel_->remove_handle();
418 channel_ = other.channel_;
419 if (channel_ !=
nullptr) {
420 channel_->add_handle();
426 : channel_(std::exchange(other.channel_,
nullptr)) {}
429 if (
this == &other) {
432 if (channel_ !=
nullptr) {
433 channel_->remove_handle();
435 channel_ = std::exchange(other.channel_,
nullptr);
441 [[nodiscard]]
bool is_open()
const {
442 return channel_ !=
nullptr && !channel_->closed();
448 PW_ASSERT(channel_ !=
nullptr);
449 return channel_->CreateSender();
456 PW_ASSERT(channel_ !=
nullptr);
457 return channel_->CreateReceiver();
463 if (channel_ !=
nullptr) {
475 if (channel_ !=
nullptr) {
476 channel_->remove_handle();
483 if (channel_ !=
nullptr) {
484 channel_->add_handle();
489 internal::Channel<T>* channel_;
510 template <
typename U>
514 template <
typename U, u
int16_t kCapacity>
534 template <
typename U>
535 friend std::optional<std::tuple<MpscChannelHandle<U>,
Receiver<U>>>
538 template <
typename U, u
int16_t kCapacity>
558 template <
typename U>
559 friend std::optional<std::tuple<SpmcChannelHandle<U>,
Sender<U>>>
562 template <
typename U, u
int16_t kCapacity>
581 template <
typename U>
585 template <
typename U, u
int16_t kCapacity>
596template <
typename T, u
int16_t kCapacity>
604 [[nodiscard]]
bool active()
const {
return this->ref_count() != 0; }
615 :
Base(Base::kMovedFrom),
616 channel_(std::exchange(other.channel_,
nullptr)) {
617 Base::MoveFrom(other);
621 if (
this == &other) {
624 if (channel_ !=
nullptr) {
625 channel_->remove_ref();
627 channel_ = std::exchange(other.channel_,
nullptr);
628 Base::MoveFrom(other);
640 static constexpr const char kWaitReason[] =
"Receiver::Receive";
643 :
Base(channel.receive_futures_), channel_(&channel) {
650 if (channel_ ==
nullptr) {
651 return Ready<std::optional<T>>(std::nullopt);
654 std::optional<T> value = channel_->TryPop();
655 if (!value.has_value()) {
656 if (channel_->closed()) {
658 return Ready<std::optional<T>>(std::nullopt);
664 return Ready(std::move(value));
668 if (channel_ !=
nullptr) {
669 channel_->remove_ref();
683 constexpr Receiver() : channel_(
nullptr) {}
689 : channel_(std::exchange(other.channel_,
nullptr)) {}
692 if (
this == &other) {
695 if (channel_ !=
nullptr) {
696 channel_->remove_receiver();
698 channel_ = std::exchange(other.channel_,
nullptr);
703 if (channel_ !=
nullptr) {
704 channel_->remove_receiver();
720 if (channel_ ==
nullptr) {
725 if (available.
ok()) {
729 std::optional<T> result;
733 [&result, ¬ification](std::optional<T> value) {
734 result = std::move(value);
737 dispatcher.
Post(task);
739 if (timeout == kWaitForever) {
741 if (!result.has_value()) {
744 return Result<T>(std::move(result.value()));
752 if (!result.has_value()) {
755 return Result<T>(std::move(result.value()));
766 if (channel_ ==
nullptr) {
779 if (channel_ ==
nullptr) {
782 std::optional<T> value = channel_->TryPop();
783 if (value.has_value()) {
784 return std::move(*value);
786 if (channel_->closed()) {
797 if (channel_ !=
nullptr) {
798 channel_->remove_receiver();
805 return channel_ !=
nullptr && !channel_->closed();
810 chrono::SystemClock::duration::max();
812 template <
typename U>
815 template <
typename U>
816 friend std::optional<std::tuple<MpscChannelHandle<U>,
Receiver<U>>>
819 template <
typename U, u
int16_t kCapacity>
823 template <
typename U>
827 template <
typename U, u
int16_t kCapacity>
832 if (channel_ !=
nullptr) {
833 channel_->add_receiver();
837 internal::Channel<T>* channel_;
845 :
Base(Base::kMovedFrom),
846 channel_(std::exchange(other.channel_,
nullptr)),
847 value_(std::move(other.value_)) {
848 Base::MoveFrom(other);
852 if (
this == &other) {
855 if (channel_ !=
nullptr) {
856 channel_->remove_ref();
858 channel_ = std::exchange(other.channel_,
nullptr);
859 value_ = std::move(other.value_);
860 Base::MoveFrom(other);
872 static constexpr const char kWaitReason[] =
"Sender::Send";
875 :
Base(channel.send_futures_), channel_(&channel), value_(value) {
880 :
Base(channel.send_futures_),
882 value_(std::move(value)) {
886 enum ClosedState { kClosed };
889 :
Base(Base::kReadyForCompletion), channel_(
nullptr), value_(value) {}
892 :
Base(Base::kReadyForCompletion),
894 value_(std::move(value)) {}
897 if (channel_ ==
nullptr || channel_->closed()) {
903 std::lock_guard lock(channel_->lock_);
904 if (channel_->full_locked()) {
908 channel_->Push(std::move(value_));
916 if (channel_ !=
nullptr) {
917 channel_->remove_ref();
941 : channel_(std::exchange(other.channel_,
nullptr)) {}
944 if (
this == &other) {
948 channel_ = std::exchange(other.channel_,
nullptr);
955 template <
typename... Args>
957 PW_ASSERT(channel_ !=
nullptr);
958 channel_->CommitReservation(std::forward<Args>(args)...);
959 channel_->remove_ref();
965 if (channel_ !=
nullptr) {
966 channel_->DropReservation();
967 channel_->remove_ref();
980 internal::Channel<T>* channel_;
986 std::optional<SendReservation<T>>> {
989 :
Base(Base::kMovedFrom),
990 channel_(std::exchange(other.channel_,
nullptr)) {
991 Base::MoveFrom(other);
995 if (channel_ !=
nullptr) {
996 channel_->remove_ref();
998 channel_ = std::exchange(other.channel_,
nullptr);
999 Base::MoveFrom(other);
1007 std::optional<SendReservation<T>>>;
1012 static constexpr const char kWaitReason[] =
"Sender::ReserveSend";
1015 :
Base(channel->reserve_send_futures_), channel_(channel) {
1016 channel_->add_ref();
1019 enum ClosedState { kClosed };
1022 :
Base(Base::kReadyForCompletion), channel_(
nullptr) {}
1025 if (channel_ ==
nullptr || channel_->closed()) {
1027 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1030 if (!channel_->Reserve()) {
1040 if (channel_ !=
nullptr) {
1041 channel_->remove_ref();
1052template <
typename T>
1055 constexpr Sender() : channel_(
nullptr) {}
1061 : channel_(std::exchange(other.channel_,
nullptr)) {}
1064 if (
this == &other) {
1067 if (channel_ !=
nullptr) {
1068 channel_->remove_sender();
1070 channel_ = std::exchange(other.channel_,
nullptr);
1075 if (channel_ !=
nullptr) {
1076 channel_->remove_sender();
1089 if (channel_ ==
nullptr) {
1104 if (channel_ ==
nullptr) {
1116 if (channel_ ==
nullptr) {
1130 if (channel_ ==
nullptr) {
1131 return std::nullopt;
1133 if (!channel_->Reserve()) {
1134 return std::nullopt;
1144 if (channel_ ==
nullptr) {
1147 return channel_->TryPush(value);
1155 if (channel_ ==
nullptr) {
1158 return channel_->TryPush(std::move(value));
1174 if (channel_ ==
nullptr) {
1193 if (channel_ ==
nullptr) {
1197 dispatcher,
SendFuture<T>(*channel_, std::move(value)), timeout);
1205 if (channel_ !=
nullptr) {
1206 channel_->remove_sender();
1213 return channel_ !=
nullptr ? channel_->remaining_capacity() : 0;
1218 return channel_ !=
nullptr ? channel_->capacity() : 0;
1223 return channel_ !=
nullptr && !channel_->closed();
1228 chrono::SystemClock::duration::max();
1230 template <
typename U>
1233 template <
typename U>
1234 friend std::optional<std::tuple<SpmcChannelHandle<U>,
Sender<U>>>
1237 template <
typename U, u
int16_t kCapacity>
1241 template <
typename U>
1245 template <
typename U, u
int16_t kCapacity>
1250 if (channel_ !=
nullptr) {
1251 channel_->add_sender();
1256 SendFuture<T>&& future,
1259 sync::TimedThreadNotification notification;
1261 FutureCallbackTask task(
1262 std::move(future), [&status, ¬ification](
bool result) {
1263 status = result ?
OkStatus() : Status::FailedPrecondition();
1264 notification.release();
1266 dispatcher.Post(task);
1268 if (timeout == kWaitForever) {
1269 notification.acquire();
1273 if (!notification.try_acquire_for(timeout)) {
1280 internal::Channel<T>* channel_;
1296template <
typename T>
1297std::optional<MpmcChannelHandle<T>> CreateMpmcChannel(
Allocator& alloc,
1298 uint16_t capacity) {
1300 if (channel ==
nullptr) {
1301 return std::nullopt;
1317template <
typename T, u
int16_t kCapacity>
1319 PW_ASSERT(!storage.
active());
1336template <
typename T>
1337std::optional<std::tuple<MpscChannelHandle<T>,
Receiver<T>>> CreateMpscChannel(
1340 if (channel ==
nullptr) {
1341 return std::nullopt;
1357template <
typename T, u
int16_t kCapacity>
1358std::tuple<MpscChannelHandle<T>,
Receiver<T>> CreateMpscChannel(
1360 PW_ASSERT(!storage.
active());
1377template <
typename T>
1378std::optional<std::tuple<SpmcChannelHandle<T>,
Sender<T>>> CreateSpmcChannel(
1381 if (channel ==
nullptr) {
1382 return std::nullopt;
1398template <
typename T, u
int16_t kCapacity>
1399std::tuple<SpmcChannelHandle<T>,
Sender<T>> CreateSpmcChannel(
1401 PW_ASSERT(!storage.
active());
1418template <
typename T>
1419std::optional<std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>>>
1422 if (channel ==
nullptr) {
1423 return std::nullopt;
1425 return std::make_tuple(
1440template <
typename T, u
int16_t kCapacity>
1443 PW_ASSERT(!storage.
active());
Definition: allocator.h:36
std::enable_if_t<!std::is_array_v< T >, T * > New(Args &&... args)
Definition: allocator.h:57
constexpr bool ok() const
Definition: result.h:447
static constexpr Status DeadlineExceeded()
Definition: status.h:177
static constexpr Status Unavailable()
Definition: status.h:304
static constexpr Status FailedPrecondition()
Definition: status.h:243
Definition: channel.h:598
bool active() const
Definition: channel.h:604
Definition: dispatcher.h:53
Definition: callback_task.h:44
A handle to a multi-producer, multi-consumer channel.
Definition: channel.h:496
friend std::optional< MpmcChannelHandle< U > > CreateMpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1297
A handle to a multi-producer, single-consumer channel.
Definition: channel.h:521
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1337
Definition: channel.h:612
A receiver which reads values from an asynchronous channel.
Definition: channel.h:681
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1337
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:804
Result< T > TryReceive()
Definition: channel.h:778
Result< T > BlockingReceive(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=kWaitForever)
Definition: channel.h:717
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1420
void Disconnect()
Definition: channel.h:796
ReceiveFuture< T > Receive()
Definition: channel.h:765
Definition: channel.h:986
Definition: channel.h:842
Definition: channel.h:935
void Cancel()
Releases the reservation, making the space available for other senders.
Definition: channel.h:964
void Commit(Args &&... args)
Commits a value to a reserved slot.
Definition: channel.h:956
A sender which writes values to an asynchronous channel.
Definition: channel.h:1053
Status BlockingSend(Dispatcher &dispatcher, const T &value, chrono::SystemClock::duration timeout=kWaitForever)
Definition: channel.h:1171
bool TrySend(const T &value)
Definition: channel.h:1143
Status BlockingSend(Dispatcher &dispatcher, T &&value, chrono::SystemClock::duration timeout=kWaitForever)
Definition: channel.h:1190
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1378
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1420
ReserveSendFuture< T > ReserveSend()
Definition: channel.h:1115
uint16_t capacity() const
Returns the maximum capacity of the channel.
Definition: channel.h:1217
bool TrySend(T &&value)
Definition: channel.h:1154
uint16_t remaining_capacity() const
Returns the remaining capacity of the channel.
Definition: channel.h:1212
void Disconnect()
Definition: channel.h:1204
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1222
SendFuture< T > Send(T &&value)
Definition: channel.h:1103
std::optional< SendReservation< T > > TryReserveSend()
Definition: channel.h:1129
SendFuture< T > Send(const T &value)
Definition: channel.h:1088
A handle to a single-producer, multi-consumer channel.
Definition: channel.h:545
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1378
A handle to a single-producer, single-consumer channel.
Definition: channel.h:569
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1420
Definition: channel.h:404
void Release()
Definition: channel.h:474
void Close()
Definition: channel.h:462
Sender< T > CreateSender()
Definition: channel.h:447
Receiver< T > CreateReceiver()
Definition: channel.h:455
bool closed() const
Definition: channel.h:72
constexpr size_type capacity() const noexcept
Returns the maximum number of elements in the deque.
Definition: generic_deque.h:74
Definition: timed_thread_notification.h:40
bool try_acquire_for(chrono::SystemClock::duration timeout)
constexpr PendingType Pending()
Returns a value indicating that an operation was not yet able to complete.
Definition: poll.h:271
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:255
std::chrono::duration< rep, period > duration
Alias for durations representable with this clock.
Definition: system_clock.h:90
constexpr bool CheckedAdd(A a, B b, T &result)
Definition: checked_arithmetic.h:70
constexpr Status OkStatus()
Definition: status.h:450
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_NO_LOCK_SAFETY_ANALYSIS
Definition: lock_annotations.h:292
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:247
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:176