19#include "pw_allocator/allocator.h"
20#include "pw_async2/callback_task.h"
21#include "pw_async2/dispatcher.h"
22#include "pw_async2/future.h"
23#include "pw_containers/deque.h"
24#include "pw_numeric/checked_arithmetic.h"
25#include "pw_result/result.h"
26#include "pw_sync/interrupt_spin_lock.h"
27#include "pw_sync/lock_annotations.h"
28#include "pw_sync/timed_thread_notification.h"
45class ReserveSendFuture;
50template <
typename T, u
int16_t kCapacity>
81 enum AllowClosed { kAllowClosed };
85 BaseChannelFuture(BaseChannel* channel, AllowClosed)
88 StoreAndAddRefIfNonnull(channel);
91 BaseChannelFuture(BaseChannelFuture&& other)
93 : channel_(other.channel_) {
97 BaseChannelFuture& MoveAssignFrom(BaseChannelFuture& other)
116 void StoreAndAddRefIfNonnull(BaseChannel* channel)
121 BaseChannel* channel_;
125 using List = FutureList<&BaseChannelFuture::core_>;
129template <
typename Derived,
typename T,
typename FutureValue>
155 return static_cast<Channel<T>*
>(base_channel());
159 using BaseChannelFuture::base_channel;
160 using BaseChannelFuture::MarkCompleted;
168 chrono::SystemClock::duration::max();
177 std::lock_guard lock(*
this);
178 return is_open_locked();
186 return ref_count_ != 0;
193 std::lock_guard lock(*
this);
200 send_futures_.Push(future);
206 receive_futures_.Push(future);
212 add_object(receiver_count_);
216 add_object(sender_count_);
220 add_object(handle_count_);
228 PW_DASSERT(reservations_ > 0);
233 remove_object(&sender_count_);
237 remove_object(&receiver_count_);
241 remove_object(&handle_count_);
254 receive_futures_.ResolveOneIfAvailable();
258 send_futures_.ResolveOneIfAvailable();
262 return reservations_;
267 if (is_open_locked()) {
284 return handle_count_ == 0 && (sender_count_ == 0 || receiver_count_ == 0);
290 virtual void Destroy() {}
321 std::lock_guard guard(*
this);
322 if (is_open_locked()) {
331 std::lock_guard guard(*
this);
332 if (is_open_locked()) {
340 deque_.push_back(std::move(value));
345 deque_.push_back(value);
349 template <
typename... Args>
351 deque_.emplace_back(std::forward<Args>(args)...);
356 T value = std::move(deque_.front());
364 return remaining_capacity_locked() == 0;
368 std::lock_guard guard(*
this);
369 return remaining_capacity_locked();
372 uint16_t remaining_capacity_locked()
const
374 return deque_.capacity() - deque_.size() - reservations();
379 return deque_.capacity();
383 return deque_.empty();
386 template <
typename U>
388 std::lock_guard guard(*
this);
389 if (!is_open_locked()) {
395 PushAndWake(std::forward<U>(value));
400 std::lock_guard guard(*
this);
401 if (deque_.empty()) {
405 return Result(PopAndWake());
409 std::lock_guard guard(*
this);
410 if (!is_open_locked()) {
420 template <
typename... Args>
423 remove_reservation();
424 if (is_open_locked()) {
425 EmplaceAndWake(std::forward<Args>(args)...);
427 RemoveRefAndDestroyIfUnreferenced();
432 : deque_(std::move(deque)) {}
434 template <
size_t kAlignment,
size_t kCapacity>
442 return deque_.deallocator();
467 Deallocator*
const deallocator = this->deallocator();
487 return channel_ !=
nullptr && channel_->is_open();
505 : channel_(&channel) {
506 channel_->add_handle();
513 : channel_(std::exchange(other.channel_,
nullptr)) {}
515 BaseChannelHandle& operator=(BaseChannelHandle&& other)
noexcept
523 BaseChannel* channel_;
550 PW_ASSERT(channel() !=
nullptr);
557 PW_ASSERT(channel() !=
nullptr);
575 template <
typename U>
579 template <
typename U, u
int16_t kCapacity>
596 template <
typename U>
597 friend std::optional<std::tuple<MpscChannelHandle<U>,
Receiver<U>>>
600 template <
typename U, u
int16_t kCapacity>
617 template <
typename U>
618 friend std::optional<std::tuple<SpmcChannelHandle<U>,
Sender<U>>>
621 template <
typename U, u
int16_t kCapacity>
636 template <
typename U>
640 template <
typename U, u
int16_t kCapacity>
735template <
typename T, u
int16_t kCapacity>
739 template <
typename U, u
int16_t kCap>
743 template <
typename U, u
int16_t kCap>
744 friend std::tuple<MpscChannelHandle<U>,
Receiver<U>> CreateMpscChannel(
747 template <
typename U, u
int16_t kCap>
748 friend std::tuple<SpmcChannelHandle<U>,
Sender<U>> CreateSpmcChannel(
751 template <
typename U, u
int16_t kCap>
762 std::lock_guard lock(*
this);
763 return this->active_locked();
766 constexpr uint16_t capacity()
const {
return kCapacity; }
780 :
Base(std::move(other)) {}
785 return static_cast<ReceiveFuture&
>(this->MoveAssignFrom(other));
789 this->RemoveFromChannel();
796 template <
typename,
typename>
801 :
Base(channel, this->kAllowClosed) {}
804 if (this->channel() ==
nullptr) {
805 PW_DASSERT(this->is_pendable());
806 return Ready<std::optional<T>>(std::nullopt);
809 this->channel()->lock();
810 PW_DASSERT(this->is_pendable());
811 if (this->channel()->empty()) {
812 return this->StoreWakerForReceiveIfOpen(cx)
814 : Ready<std::optional<T>>(std::nullopt);
817 auto result =
Ready(this->channel()->PopAndWake());
827 constexpr Receiver() : channel_(
nullptr) {}
833 : channel_(std::exchange(other.channel_,
nullptr)) {}
836 if (
this == &other) {
839 if (channel_ !=
nullptr) {
840 channel_->remove_receiver();
842 channel_ = std::exchange(other.channel_,
nullptr);
847 if (channel_ !=
nullptr) {
848 channel_->remove_receiver();
865 if (channel_ ==
nullptr) {
870 if (
Result<T> result = channel_->TryReceive();
871 result.ok() || result.status().IsFailedPrecondition()) {
875 std::optional<T> result;
879 [&result, ¬ification](std::optional<T>&& val) {
880 result = std::move(val);
884 dispatcher.Post(task);
888 if (!result.has_value()) {
898 if (!result.has_value()) {
922 if (channel_ ==
nullptr) {
925 return channel_->TryReceive();
933 if (channel_ !=
nullptr) {
934 channel_->remove_receiver();
941 return channel_ !=
nullptr && channel_->is_open();
945 template <
typename U>
948 template <
typename U>
949 friend std::optional<std::tuple<MpscChannelHandle<U>,
Receiver<U>>>
952 template <
typename U, u
int16_t kCapacity>
956 template <
typename U>
960 template <
typename U, u
int16_t kCapacity>
966 : channel_(&channel) {
967 channel_->add_receiver();
970 internal::Channel<T>* channel_;
983 :
Base(
static_cast<Base&&
>(other)), value_(std::move(other.value_)) {}
987 value_ = std::move(other.value_);
989 return static_cast<SendFuture&
>(this->MoveAssignFrom(other));
993 this->RemoveFromChannel();
1003 :
Base(channel), value_(value) {}
1007 :
Base(channel), value_(std::move(value)) {}
1010 if (this->channel() ==
nullptr) {
1011 PW_DASSERT(this->is_pendable());
1012 return Ready(
false);
1015 this->channel()->lock();
1016 PW_DASSERT(this->is_pendable());
1017 if (!this->channel()->is_open_locked()) {
1019 return Ready(
false);
1022 if (this->channel()->full()) {
1023 this->StoreWakerForSend(cx);
1027 this->channel()->PushAndWake(std::move(*value_));
1032 std::optional<T> value_;
1041template <
typename T>
1048 : channel_(std::exchange(other.channel_,
nullptr)) {}
1051 if (
this == &other) {
1055 channel_ = std::exchange(other.channel_,
nullptr);
1062 template <
typename... Args>
1064 PW_ASSERT(channel_ !=
nullptr);
1065 channel_->CommitReservationAndRemoveRef(std::forward<Args>(args)...);
1071 if (channel_ !=
nullptr) {
1072 channel_->DropReservationAndRemoveRef();
1084 : channel_(&channel) {
1085 channel_->add_ref();
1088 internal::Channel<T>* channel_;
1091template <
typename T>
1095 std::optional<SendReservation<T>>> {
1097 using Base = internal::
1098 ChannelFuture<ReserveSendFuture, T, std::optional<SendReservation<T>>>;
1111 this->RemoveFromChannel();
1125 if (this->channel() ==
nullptr) {
1126 PW_DASSERT(this->is_pendable());
1127 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1130 this->channel()->lock();
1131 PW_DASSERT(this->is_pendable());
1132 if (!this->channel()->is_open_locked()) {
1134 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1137 if (this->channel()->remaining_capacity_locked() == 0) {
1138 this->StoreWakerForReserveSend(cx);
1142 this->channel()->add_reservation();
1150template <
typename T>
1153 constexpr Sender() : channel_(
nullptr) {}
1159 : channel_(std::exchange(other.channel_,
nullptr)) {}
1162 if (
this == &other) {
1165 if (channel_ !=
nullptr) {
1166 channel_->remove_sender();
1168 channel_ = std::exchange(other.channel_,
nullptr);
1173 if (channel_ !=
nullptr) {
1174 channel_->remove_sender();
1186 template <
typename U>
1208 if (channel_ ==
nullptr) {
1211 return channel_->TryReserveSend();
1224 if (channel_ ==
nullptr) {
1227 return channel_->TrySend(value);
1232 if (channel_ ==
nullptr) {
1235 return channel_->TrySend(std::move(value));
1252 return BlockingSendMoveOrCopy(dispatcher, value, timeout);
1260 return BlockingSendMoveOrCopy(dispatcher, std::move(value), timeout);
1268 if (channel_ !=
nullptr) {
1269 channel_->remove_sender();
1276 return channel_ !=
nullptr ? channel_->remaining_capacity() : 0;
1281 return channel_ !=
nullptr ? channel_->capacity() : 0;
1286 return channel_ !=
nullptr && channel_->is_open();
1290 template <
typename U>
1293 template <
typename U>
1294 friend std::optional<std::tuple<SpmcChannelHandle<U>,
Sender<U>>>
1297 template <
typename U, u
int16_t kCapacity>
1301 template <
typename U>
1305 template <
typename U, u
int16_t kCapacity>
1311 : channel_(&channel) {
1312 channel_->add_sender();
1315 template <
typename U>
1316 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1320 if (channel_ ==
nullptr) {
1324 if (Status status = channel_->TrySend(std::forward<U>(value));
1325 status.ok() || status.IsFailedPrecondition()) {
1329 return BlockingSendFuture(
1331 SendFuture<T>(channel_, std::forward<U>(value)),
1335 Status BlockingSendFuture(Dispatcher& dispatcher,
1336 SendFuture<T>&& future,
1340 sync::TimedThreadNotification notification;
1343 [&status, ¬ification](
bool result) {
1344 status = result ?
OkStatus() : Status::FailedPrecondition();
1345 notification.release();
1348 dispatcher.Post(task);
1350 if (timeout == internal::Channel<T>::kWaitForever) {
1351 notification.acquire();
1355 if (!notification.try_acquire_for(timeout)) {
1362 internal::Channel<T>* channel_;
1378template <
typename T>
1380 uint16_t capacity) {
1382 if (channel ==
nullptr) {
1383 return std::nullopt;
1385 std::lock_guard lock(*channel);
1400template <
typename T, u
int16_t kCapacity>
1403 PW_DASSERT(!storage.active_locked());
1420template <
typename T>
1424 if (channel ==
nullptr) {
1425 return std::nullopt;
1427 std::lock_guard lock(*channel);
1442template <
typename T, u
int16_t kCapacity>
1446 PW_DASSERT(!storage.active_locked());
1463template <
typename T>
1467 if (channel ==
nullptr) {
1468 return std::nullopt;
1470 std::lock_guard lock(*channel);
1485template <
typename T, u
int16_t kCapacity>
1489 PW_DASSERT(!storage.active_locked());
1506template <
typename T>
1507std::optional<std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>>>
1510 if (channel ==
nullptr) {
1511 return std::nullopt;
1513 std::lock_guard lock(*channel);
1530template <
typename T, u
int16_t kCapacity>
1534 PW_DASSERT(!storage.active_locked());
1535 return std::make_tuple(
1544 channel_->RemoveRefAndDestroyIfUnreferenced();
Definition: allocator.h:45
std::enable_if_t<!std::is_array_v< T >, T * > New(Args &&... args)
Definition: allocator.h:66
Abstract interface for releasing memory.
Definition: deallocator.h:29
static constexpr Status DeadlineExceeded()
Definition: status.h:177
static constexpr Status Unavailable()
Definition: status.h:304
static constexpr Status FailedPrecondition()
Definition: status.h:243
Definition: callback_task.h:44
Channel handle for a particular type T.
Definition: channel.h:532
Sender< T > CreateSender()
Definition: channel.h:549
Receiver< T > CreateReceiver()
Definition: channel.h:556
Definition: channel.h:737
bool active() const
Definition: channel.h:761
Definition: dispatcher.h:46
constexpr bool is_pendable() const
Definition: future.h:246
void MarkComplete()
Definition: future.h:297
constexpr bool is_complete() const
Definition: future.h:251
Definition: channel.h:692
Definition: channel.h:649
A handle to a multi-producer, multi-consumer channel.
Definition: channel.h:564
friend std::optional< MpmcChannelHandle< U > > CreateMpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1379
A handle to a multi-producer, single-consumer channel.
Definition: channel.h:586
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1421
Definition: channel.h:771
A receiver which reads values from an asynchronous channel.
Definition: channel.h:825
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1421
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:940
Result< T > TryReceive()
Definition: channel.h:921
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1508
Result< T > BlockingReceive(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:861
void Disconnect()
Definition: channel.h:932
ReceiveFuture< T > Receive()
Definition: channel.h:911
Definition: channel.h:1095
Definition: channel.h:975
Definition: channel.h:1042
void Cancel()
Releases the reservation, making the space available for other senders.
Definition: channel.h:1070
void Commit(Args &&... args)
Commits a value to a reserved slot.
Definition: channel.h:1063
A sender which writes values to an asynchronous channel.
Definition: channel.h:1151
SendFuture< T > Send(U &&value)
Definition: channel.h:1187
Result< SendReservation< T > > TryReserveSend()
Definition: channel.h:1207
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1464
Status TrySend(const T &value)
Definition: channel.h:1223
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1508
ReserveSendFuture< T > ReserveSend()
Definition: channel.h:1196
uint16_t capacity() const
Returns the maximum capacity of the channel.
Definition: channel.h:1280
Status TrySend(T &&value)
Definition: channel.h:1231
uint16_t remaining_capacity() const
Returns the remaining capacity of the channel.
Definition: channel.h:1275
void Disconnect()
Definition: channel.h:1267
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1285
Status BlockingSend(Dispatcher &dispatcher, const T &value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1248
Status BlockingSend(Dispatcher &dispatcher, T &&value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1256
A handle to a single-producer, multi-consumer channel.
Definition: channel.h:607
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1464
A handle to a single-producer, single-consumer channel.
Definition: channel.h:628
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1508
bool is_complete() const
True if the future has returned Ready().
Definition: channel.h:74
bool is_pendable() const
True if the Pend can be called on the future.
Definition: channel.h:71
Definition: channel.h:478
Definition: channel.h:165
Definition: channel.h:130
Definition: channel.h:317
Definition: channel.h:450
constexpr size_type capacity() const noexcept
Returns the maximum number of elements in the deque.
Definition: generic_deque.h:74
Definition: interrupt_spin_lock.h:50
Definition: timed_thread_notification.h:40
bool try_acquire_for(chrono::SystemClock::duration timeout)
void Deallocate(void *ptr)
Definition: deallocator.h:60
std::optional< std::tuple< SpmcChannelHandle< T >, Sender< T > > > CreateSpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1464
std::optional< std::tuple< MpscChannelHandle< T >, Receiver< T > > > CreateMpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1421
std::optional< MpmcChannelHandle< T > > CreateMpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1379
std::optional< std::tuple< SpscChannelHandle< T >, Sender< T >, Receiver< T > > > CreateSpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1508
std::conditional_t< std::is_void_v< typename T::value_type >, ReadyType, typename T::value_type > FutureValue
Definition: future.h:107
constexpr bool IsReady() const noexcept
Returns whether or not this value is Ready.
Definition: poll.h:211
constexpr PendingType Pending()
Returns a value indicating that an operation was not yet able to complete.
Definition: poll.h:353
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:337
std::chrono::duration< rep, period > duration
Alias for durations representable with this clock.
Definition: system_clock.h:90
constexpr bool CheckedIncrement(T &base, Inc inc)
Definition: checked_arithmetic.h:118
constexpr Status OkStatus()
Definition: status.h:450
#define PW_LOCKABLE(name)
Definition: lock_annotations.h:208
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_NO_LOCK_SAFETY_ANALYSIS
Definition: lock_annotations.h:292
#define PW_EXCLUSIVE_LOCK_FUNCTION(...)
Definition: lock_annotations.h:230
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146
#define PW_LOCK_RETURNED(x)
Definition: lock_annotations.h:197
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:247
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:176