18#include "pw_allocator/allocator.h"
19#include "pw_async2/callback_task.h"
20#include "pw_async2/dispatcher.h"
21#include "pw_async2/future.h"
22#include "pw_containers/deque.h"
23#include "pw_numeric/checked_arithmetic.h"
24#include "pw_result/result.h"
25#include "pw_sync/interrupt_spin_lock.h"
26#include "pw_sync/lock_annotations.h"
27#include "pw_sync/timed_thread_notification.h"
46class ReserveSendFuture;
51template <
typename T, u
int16_t kCapacity>
59class BaseChannelFuture;
66 chrono::SystemClock::duration::max();
75 std::lock_guard lock(*
this);
76 return is_open_locked();
84 return ref_count_ != 0;
91 std::lock_guard lock(*
this);
98 containers::PushBackSlow(send_futures_, future);
104 containers::PushBackSlow(receive_futures_, future);
110 add_object(receiver_count_);
114 add_object(sender_count_);
118 add_object(handle_count_);
126 PW_DASSERT(reservations_ > 0);
131 remove_object(&sender_count_);
135 remove_object(&receiver_count_);
139 remove_object(&handle_count_);
152 PopAndWakeOneIfAvailable(receive_futures_);
156 PopAndWakeOneIfAvailable(send_futures_);
160 return reservations_;
166 static void PopAndWakeOneIfAvailable(
172 if (is_open_locked()) {
189 return handle_count_ == 0 && (sender_count_ == 0 || receiver_count_ == 0);
195 virtual void Destroy() {}
231 void Wake() { std::move(waker_).Wake(); }
238 enum AllowClosed { kAllowClosed };
241 BaseChannelFuture(BaseChannel* channel, AllowClosed)
243 StoreAndAddRefIfNonnull(channel);
246 BaseChannelFuture(BaseChannelFuture&& other)
248 : channel_(other.channel_) {
252 BaseChannelFuture& MoveAssignFrom(BaseChannelFuture& other)
264 void MarkCompleted() { completed_ =
true; }
267 channel_->RemoveRefAndDestroyIfUnreferenced();
274 void StoreAndAddRefIfNonnull(BaseChannel* channel)
279 BaseChannel* channel_;
282 bool completed_ =
false;
286template <
typename Derived,
typename T,
typename FutureValue>
289 using value_type = FutureValue;
309 return static_cast<Channel<T>*
>(base_channel());
313 using BaseChannelFuture::base_channel;
314 using BaseChannelFuture::MarkCompleted;
315 using BaseChannelFuture::Wake;
325 std::lock_guard guard(*
this);
326 if (is_open_locked()) {
335 std::lock_guard guard(*
this);
336 if (is_open_locked()) {
344 deque_.push_back(std::move(value));
349 deque_.push_back(value);
353 template <
typename... Args>
355 deque_.emplace_back(std::forward<Args>(args)...);
360 T value = std::move(deque_.front());
368 return remaining_capacity_locked() == 0;
372 std::lock_guard guard(*
this);
373 return remaining_capacity_locked();
376 uint16_t remaining_capacity_locked()
const
378 return deque_.capacity() - deque_.size() - reservations();
383 return deque_.capacity();
387 return deque_.empty();
390 template <
typename U>
392 std::lock_guard guard(*
this);
393 if (!is_open_locked()) {
399 PushAndWake(std::forward<U>(value));
404 std::lock_guard guard(*
this);
405 if (deque_.empty()) {
409 return Result(PopAndWake());
413 std::lock_guard guard(*
this);
414 if (!is_open_locked()) {
424 template <
typename... Args>
427 remove_reservation();
428 if (is_open_locked()) {
429 EmplaceAndWake(std::forward<Args>(args)...);
431 RemoveRefAndDestroyIfUnreferenced();
436 : deque_(std::move(deque)) {}
438 template <
size_t kAlignment,
size_t kCapacity>
446 return deque_.deallocator();
471 Deallocator*
const deallocator = this->deallocator();
492 : channel_(std::exchange(other.channel_,
nullptr)) {}
500 return channel_ !=
nullptr && channel_->is_open();
522 : channel_(&channel) {
523 channel_->add_handle();
527 BaseChannel* channel_;
539 PW_ASSERT(base_channel() !=
nullptr);
546 PW_ASSERT(base_channel() !=
nullptr);
574 template <
typename U>
578 template <
typename U, u
int16_t kCapacity>
598 template <
typename U>
599 friend std::optional<std::tuple<MpscChannelHandle<U>,
Receiver<U>>>
602 template <
typename U, u
int16_t kCapacity>
622 template <
typename U>
623 friend std::optional<std::tuple<SpmcChannelHandle<U>,
Sender<U>>>
626 template <
typename U, u
int16_t kCapacity>
645 template <
typename U>
649 template <
typename U, u
int16_t kCapacity>
658template <
typename T, u
int16_t kCapacity>
662 template <
typename U, u
int16_t kCap>
666 template <
typename U, u
int16_t kCap>
667 friend std::tuple<MpscChannelHandle<U>,
Receiver<U>> CreateMpscChannel(
670 template <
typename U, u
int16_t kCap>
671 friend std::tuple<SpmcChannelHandle<U>,
Sender<U>> CreateSpmcChannel(
674 template <
typename U, u
int16_t kCap>
685 std::lock_guard lock(*
this);
686 return this->active_locked();
689 constexpr uint16_t capacity()
const {
return kCapacity; }
703 :
Base(std::move(other)) {}
708 return static_cast<ReceiveFuture&
>(this->MoveAssignFrom(other));
712 this->RemoveFromChannel();
719 template <
typename,
typename>
724 :
Base(channel, this->kAllowClosed) {}
727 if (this->channel() ==
nullptr) {
728 return Ready<std::optional<T>>(std::nullopt);
731 this->channel()->lock();
732 if (this->channel()->empty()) {
733 return this->StoreWakerForReceiveIfOpen(cx)
735 : Ready<std::optional<T>>(std::nullopt);
738 auto result =
Ready(this->channel()->PopAndWake());
748 constexpr Receiver() : channel_(
nullptr) {}
754 : channel_(std::exchange(other.channel_,
nullptr)) {}
757 if (
this == &other) {
760 if (channel_ !=
nullptr) {
761 channel_->remove_receiver();
763 channel_ = std::exchange(other.channel_,
nullptr);
768 if (channel_ !=
nullptr) {
769 channel_->remove_receiver();
786 if (channel_ ==
nullptr) {
791 if (
Result<T> result = channel_->TryReceive();
792 result.ok() || result.status().IsFailedPrecondition()) {
796 std::optional<T> result;
800 [&result, ¬ification](std::optional<T>&& val) {
801 result = std::move(val);
805 dispatcher.Post(task);
809 if (!result.has_value()) {
819 if (!result.has_value()) {
843 if (channel_ ==
nullptr) {
846 return channel_->TryReceive();
854 if (channel_ !=
nullptr) {
855 channel_->remove_receiver();
862 return channel_ !=
nullptr && channel_->is_open();
866 template <
typename U>
869 template <
typename U>
870 friend std::optional<std::tuple<MpscChannelHandle<U>,
Receiver<U>>>
873 template <
typename U, u
int16_t kCapacity>
877 template <
typename U>
881 template <
typename U, u
int16_t kCapacity>
887 : channel_(&channel) {
888 channel_->add_receiver();
891 internal::Channel<T>* channel_;
902 :
Base(
static_cast<Base&&
>(other)), value_(std::move(other.value_)) {}
906 value_ = std::move(other.value_);
908 return static_cast<SendFuture&
>(this->MoveAssignFrom(other));
912 this->RemoveFromChannel();
922 :
Base(channel), value_(value) {}
926 :
Base(channel), value_(std::move(value)) {}
929 if (this->channel() ==
nullptr) {
933 this->channel()->lock();
934 if (!this->channel()->is_open_locked()) {
939 if (this->channel()->full()) {
940 this->StoreWakerForSend(cx);
944 this->channel()->PushAndWake(std::move(value_));
965 : channel_(std::exchange(other.channel_,
nullptr)) {}
968 if (
this == &other) {
972 channel_ = std::exchange(other.channel_,
nullptr);
979 template <
typename... Args>
981 PW_ASSERT(channel_ !=
nullptr);
982 channel_->CommitReservationAndRemoveRef(std::forward<Args>(args)...);
988 if (channel_ !=
nullptr) {
989 channel_->DropReservationAndRemoveRef();
1001 : channel_(&channel) {
1002 channel_->add_ref();
1005 internal::Channel<T>* channel_;
1008template <
typename T>
1012 std::optional<SendReservation<T>>> {
1014 using Base = internal::
1015 ChannelFuture<ReserveSendFuture, T, std::optional<SendReservation<T>>>;
1026 this->RemoveFromChannel();
1040 if (this->channel() ==
nullptr) {
1041 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1044 this->channel()->lock();
1045 if (!this->channel()->is_open_locked()) {
1047 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1050 if (this->channel()->remaining_capacity_locked() == 0) {
1051 this->StoreWakerForReserveSend(cx);
1055 this->channel()->add_reservation();
1063template <
typename T>
1066 constexpr Sender() : channel_(
nullptr) {}
1072 : channel_(std::exchange(other.channel_,
nullptr)) {}
1075 if (
this == &other) {
1078 if (channel_ !=
nullptr) {
1079 channel_->remove_sender();
1081 channel_ = std::exchange(other.channel_,
nullptr);
1086 if (channel_ !=
nullptr) {
1087 channel_->remove_sender();
1099 template <
typename U>
1121 if (channel_ ==
nullptr) {
1124 return channel_->TryReserveSend();
1137 if (channel_ ==
nullptr) {
1140 return channel_->TrySend(value);
1145 if (channel_ ==
nullptr) {
1148 return channel_->TrySend(std::move(value));
1165 return BlockingSendMoveOrCopy(dispatcher, value, timeout);
1173 return BlockingSendMoveOrCopy(dispatcher, std::move(value), timeout);
1181 if (channel_ !=
nullptr) {
1182 channel_->remove_sender();
1189 return channel_ !=
nullptr ? channel_->remaining_capacity() : 0;
1194 return channel_ !=
nullptr ? channel_->capacity() : 0;
1199 return channel_ !=
nullptr && channel_->is_open();
1203 template <
typename U>
1206 template <
typename U>
1207 friend std::optional<std::tuple<SpmcChannelHandle<U>,
Sender<U>>>
1210 template <
typename U, u
int16_t kCapacity>
1214 template <
typename U>
1218 template <
typename U, u
int16_t kCapacity>
1224 : channel_(&channel) {
1225 channel_->add_sender();
1228 template <
typename U>
1229 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1233 if (channel_ ==
nullptr) {
1237 if (Status status = channel_->TrySend(std::forward<U>(value));
1238 status.ok() || status.IsFailedPrecondition()) {
1242 return BlockingSendFuture(
1244 SendFuture<T>(channel_, std::forward<U>(value)),
1248 Status BlockingSendFuture(Dispatcher& dispatcher,
1249 SendFuture<T>&& future,
1253 sync::TimedThreadNotification notification;
1256 [&status, ¬ification](
bool result) {
1257 status = result ?
OkStatus() : Status::FailedPrecondition();
1258 notification.release();
1261 dispatcher.Post(task);
1263 if (timeout == internal::Channel<T>::kWaitForever) {
1264 notification.acquire();
1268 if (!notification.try_acquire_for(timeout)) {
1275 internal::Channel<T>* channel_;
1291template <
typename T>
1293 uint16_t capacity) {
1295 if (channel ==
nullptr) {
1296 return std::nullopt;
1298 std::lock_guard lock(*channel);
1313template <
typename T, u
int16_t kCapacity>
1316 PW_DASSERT(!storage.active_locked());
1333template <
typename T>
1337 if (channel ==
nullptr) {
1338 return std::nullopt;
1340 std::lock_guard lock(*channel);
1355template <
typename T, u
int16_t kCapacity>
1359 PW_DASSERT(!storage.active_locked());
1376template <
typename T>
1380 if (channel ==
nullptr) {
1381 return std::nullopt;
1383 std::lock_guard lock(*channel);
1398template <
typename T, u
int16_t kCapacity>
1402 PW_DASSERT(!storage.active_locked());
1419template <
typename T>
1420std::optional<std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>>>
1423 if (channel ==
nullptr) {
1424 return std::nullopt;
1426 std::lock_guard lock(*channel);
1443template <
typename T, u
int16_t kCapacity>
1447 PW_DASSERT(!storage.active_locked());
1448 return std::make_tuple(
1454inline void BaseChannel::PopAndWakeOne(
1456 BaseChannelFuture& future = futures.
front();
Definition: allocator.h:36
std::enable_if_t<!std::is_array_v< T >, T * > New(Args &&... args)
Definition: allocator.h:57
Abstract interface for releasing memory.
Definition: deallocator.h:29
Definition: intrusive_forward_list.h:99
static constexpr Status DeadlineExceeded()
Definition: status.h:177
static constexpr Status Unavailable()
Definition: status.h:304
static constexpr Status FailedPrecondition()
Definition: status.h:243
Definition: callback_task.h:44
Definition: channel.h:660
Definition: dispatcher.h:53
A handle to a multi-producer, multi-consumer channel.
Definition: channel.h:560
A handle to a multi-producer, single-consumer channel.
Definition: channel.h:585
Definition: channel.h:694
A receiver which reads values from an asynchronous channel.
Definition: channel.h:746
Definition: channel.h:1012
Definition: channel.h:896
Definition: channel.h:959
A sender which writes values to an asynchronous channel.
Definition: channel.h:1064
A handle to a single-producer, multi-consumer channel.
Definition: channel.h:609
A handle to a single-producer, single-consumer channel.
Definition: channel.h:633
Definition: channel.h:219
Definition: channel.h:482
Definition: channel.h:287
Channel handle for a particular type T.
Definition: channel.h:532
Definition: channel.h:321
Definition: channel.h:454
constexpr size_type capacity() const noexcept
Returns the maximum number of elements in the deque.
Definition: generic_deque.h:74
Definition: interrupt_spin_lock.h:50
Definition: timed_thread_notification.h:40
bool try_acquire_for(chrono::SystemClock::duration timeout)
void Deallocate(void *ptr)
Definition: deallocator.h:60
friend std::optional< MpmcChannelHandle< U > > CreateMpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1292
SendFuture< T > Send(U &&value)
Definition: channel.h:1100
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1334
void Cancel()
Releases the reservation, making the space available for other senders.
Definition: channel.h:987
Result< SendReservation< T > > TryReserveSend()
Definition: channel.h:1120
std::optional< std::tuple< SpmcChannelHandle< T >, Sender< T > > > CreateSpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1377
std::optional< std::tuple< MpscChannelHandle< T >, Receiver< T > > > CreateMpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1334
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:861
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1377
Result< T > TryReceive()
Definition: channel.h:842
void Commit(Args &&... args)
Commits a value to a reserved slot.
Definition: channel.h:980
bool active() const
Definition: channel.h:684
Status TrySend(const T &value)
Definition: channel.h:1136
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1421
Result< T > BlockingReceive(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:782
ReserveSendFuture< T > ReserveSend()
Definition: channel.h:1109
uint16_t capacity() const
Returns the maximum capacity of the channel.
Definition: channel.h:1193
Status TrySend(T &&value)
Definition: channel.h:1144
uint16_t remaining_capacity() const
Returns the remaining capacity of the channel.
Definition: channel.h:1188
void Disconnect()
Definition: channel.h:1180
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1198
bool is_complete() const
True if the future has returned Ready().
Definition: channel.h:228
void Disconnect()
Definition: channel.h:853
std::optional< MpmcChannelHandle< T > > CreateMpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1292
std::optional< std::tuple< SpscChannelHandle< T >, Sender< T >, Receiver< T > > > CreateSpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1421
Status BlockingSend(Dispatcher &dispatcher, const T &value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1161
Sender< T > CreateSender()
Definition: channel.h:538
Receiver< T > CreateReceiver()
Definition: channel.h:545
ReceiveFuture< T > Receive()
Definition: channel.h:832
Status BlockingSend(Dispatcher &dispatcher, T &&value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1169
constexpr bool IsReady() const noexcept
Returns whether or not this value is Ready.
Definition: poll.h:133
constexpr PendingType Pending()
Returns a value indicating that an operation was not yet able to complete.
Definition: poll.h:271
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:255
std::chrono::duration< rep, period > duration
Alias for durations representable with this clock.
Definition: system_clock.h:90
void pop_front()
Definition: intrusive_forward_list.h:245
reference front()
Reference to the first element in the list. Undefined behavior if empty().
Definition: intrusive_forward_list.h:175
constexpr bool CheckedIncrement(T &base, Inc inc)
Definition: checked_arithmetic.h:118
constexpr Status OkStatus()
Definition: status.h:450
#define PW_LOCKABLE(name)
Definition: lock_annotations.h:208
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_NO_LOCK_SAFETY_ANALYSIS
Definition: lock_annotations.h:292
#define PW_EXCLUSIVE_LOCK_FUNCTION(...)
Definition: lock_annotations.h:230
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146
#define PW_LOCK_RETURNED(x)
Definition: lock_annotations.h:197
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:247
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:176