18#include "pw_allocator/allocator.h"
19#include "pw_async2/callback_task.h"
20#include "pw_async2/dispatcher.h"
21#include "pw_async2/future.h"
22#include "pw_containers/deque.h"
23#include "pw_numeric/checked_arithmetic.h"
24#include "pw_result/result.h"
25#include "pw_sync/interrupt_spin_lock.h"
26#include "pw_sync/lock_annotations.h"
27#include "pw_sync/timed_thread_notification.h"
44class ReserveSendFuture;
49template <
typename T, u
int16_t kCapacity>
71 void Wake() { waker_.
Wake(); }
78 enum AllowClosed { kAllowClosed };
81 BaseChannelFuture(BaseChannel* channel, AllowClosed)
83 StoreAndAddRefIfNonnull(channel);
86 BaseChannelFuture(BaseChannelFuture&& other)
88 : channel_(other.channel_) {
92 BaseChannelFuture& MoveAssignFrom(BaseChannelFuture& other)
104 void MarkCompleted() { completed_ =
true; }
111 void StoreAndAddRefIfNonnull(BaseChannel* channel)
116 BaseChannel* channel_;
119 bool completed_ =
false;
123template <
typename Derived,
typename T,
typename FutureValue>
126 using value_type = FutureValue;
146 return static_cast<Channel<T>*
>(base_channel());
150 using BaseChannelFuture::base_channel;
151 using BaseChannelFuture::MarkCompleted;
152 using BaseChannelFuture::Wake;
160 chrono::SystemClock::duration::max();
169 std::lock_guard lock(*
this);
170 return is_open_locked();
178 return ref_count_ != 0;
185 std::lock_guard lock(*
this);
192 containers::PushBackSlow(send_futures_, future);
198 containers::PushBackSlow(receive_futures_, future);
204 add_object(receiver_count_);
208 add_object(sender_count_);
212 add_object(handle_count_);
220 PW_DASSERT(reservations_ > 0);
225 remove_object(&sender_count_);
229 remove_object(&receiver_count_);
233 remove_object(&handle_count_);
246 PopAndWakeOneIfAvailable(receive_futures_);
250 PopAndWakeOneIfAvailable(send_futures_);
254 return reservations_;
260 static void PopAndWakeOneIfAvailable(
266 if (is_open_locked()) {
283 return handle_count_ == 0 && (sender_count_ == 0 || receiver_count_ == 0);
289 virtual void Destroy() {}
320 std::lock_guard guard(*
this);
321 if (is_open_locked()) {
330 std::lock_guard guard(*
this);
331 if (is_open_locked()) {
339 deque_.push_back(std::move(value));
344 deque_.push_back(value);
348 template <
typename... Args>
350 deque_.emplace_back(std::forward<Args>(args)...);
355 T value = std::move(deque_.front());
363 return remaining_capacity_locked() == 0;
367 std::lock_guard guard(*
this);
368 return remaining_capacity_locked();
371 uint16_t remaining_capacity_locked()
const
373 return deque_.capacity() - deque_.size() - reservations();
378 return deque_.capacity();
382 return deque_.empty();
385 template <
typename U>
387 std::lock_guard guard(*
this);
388 if (!is_open_locked()) {
394 PushAndWake(std::forward<U>(value));
399 std::lock_guard guard(*
this);
400 if (deque_.empty()) {
404 return Result(PopAndWake());
408 std::lock_guard guard(*
this);
409 if (!is_open_locked()) {
419 template <
typename... Args>
422 remove_reservation();
423 if (is_open_locked()) {
424 EmplaceAndWake(std::forward<Args>(args)...);
426 RemoveRefAndDestroyIfUnreferenced();
431 : deque_(std::move(deque)) {}
433 template <
size_t kAlignment,
size_t kCapacity>
441 return deque_.deallocator();
466 Deallocator*
const deallocator = this->deallocator();
486 return channel_ !=
nullptr && channel_->is_open();
504 : channel_(&channel) {
505 channel_->add_handle();
512 : channel_(std::exchange(other.channel_,
nullptr)) {}
514 BaseChannelHandle& operator=(BaseChannelHandle&& other)
noexcept
522 BaseChannel* channel_;
549 PW_ASSERT(channel() !=
nullptr);
556 PW_ASSERT(channel() !=
nullptr);
574 template <
typename U>
578 template <
typename U, u
int16_t kCapacity>
595 template <
typename U>
596 friend std::optional<std::tuple<MpscChannelHandle<U>,
Receiver<U>>>
599 template <
typename U, u
int16_t kCapacity>
616 template <
typename U>
617 friend std::optional<std::tuple<SpmcChannelHandle<U>,
Sender<U>>>
620 template <
typename U, u
int16_t kCapacity>
635 template <
typename U>
639 template <
typename U, u
int16_t kCapacity>
734template <
typename T, u
int16_t kCapacity>
738 template <
typename U, u
int16_t kCap>
742 template <
typename U, u
int16_t kCap>
743 friend std::tuple<MpscChannelHandle<U>,
Receiver<U>> CreateMpscChannel(
746 template <
typename U, u
int16_t kCap>
747 friend std::tuple<SpmcChannelHandle<U>,
Sender<U>> CreateSpmcChannel(
750 template <
typename U, u
int16_t kCap>
761 std::lock_guard lock(*
this);
762 return this->active_locked();
765 constexpr uint16_t capacity()
const {
return kCapacity; }
779 :
Base(std::move(other)) {}
784 return static_cast<ReceiveFuture&
>(this->MoveAssignFrom(other));
788 this->RemoveFromChannel();
795 template <
typename,
typename>
800 :
Base(channel, this->kAllowClosed) {}
803 if (this->channel() ==
nullptr) {
804 return Ready<std::optional<T>>(std::nullopt);
807 this->channel()->lock();
808 if (this->channel()->empty()) {
809 return this->StoreWakerForReceiveIfOpen(cx)
811 : Ready<std::optional<T>>(std::nullopt);
814 auto result =
Ready(this->channel()->PopAndWake());
824 constexpr Receiver() : channel_(
nullptr) {}
830 : channel_(std::exchange(other.channel_,
nullptr)) {}
833 if (
this == &other) {
836 if (channel_ !=
nullptr) {
837 channel_->remove_receiver();
839 channel_ = std::exchange(other.channel_,
nullptr);
844 if (channel_ !=
nullptr) {
845 channel_->remove_receiver();
862 if (channel_ ==
nullptr) {
867 if (
Result<T> result = channel_->TryReceive();
868 result.ok() || result.status().IsFailedPrecondition()) {
872 std::optional<T> result;
876 [&result, ¬ification](std::optional<T>&& val) {
877 result = std::move(val);
881 dispatcher.Post(task);
885 if (!result.has_value()) {
895 if (!result.has_value()) {
919 if (channel_ ==
nullptr) {
922 return channel_->TryReceive();
930 if (channel_ !=
nullptr) {
931 channel_->remove_receiver();
938 return channel_ !=
nullptr && channel_->is_open();
942 template <
typename U>
945 template <
typename U>
946 friend std::optional<std::tuple<MpscChannelHandle<U>,
Receiver<U>>>
949 template <
typename U, u
int16_t kCapacity>
953 template <
typename U>
957 template <
typename U, u
int16_t kCapacity>
963 : channel_(&channel) {
964 channel_->add_receiver();
967 internal::Channel<T>* channel_;
978 :
Base(
static_cast<Base&&
>(other)), value_(std::move(other.value_)) {}
982 value_ = std::move(other.value_);
984 return static_cast<SendFuture&
>(this->MoveAssignFrom(other));
988 this->RemoveFromChannel();
998 :
Base(channel), value_(value) {}
1002 :
Base(channel), value_(std::move(value)) {}
1005 if (this->channel() ==
nullptr) {
1006 return Ready(
false);
1009 this->channel()->lock();
1010 if (!this->channel()->is_open_locked()) {
1012 return Ready(
false);
1015 if (this->channel()->full()) {
1016 this->StoreWakerForSend(cx);
1020 this->channel()->PushAndWake(std::move(value_));
1034template <
typename T>
1041 : channel_(std::exchange(other.channel_,
nullptr)) {}
1044 if (
this == &other) {
1048 channel_ = std::exchange(other.channel_,
nullptr);
1055 template <
typename... Args>
1057 PW_ASSERT(channel_ !=
nullptr);
1058 channel_->CommitReservationAndRemoveRef(std::forward<Args>(args)...);
1064 if (channel_ !=
nullptr) {
1065 channel_->DropReservationAndRemoveRef();
1077 : channel_(&channel) {
1078 channel_->add_ref();
1081 internal::Channel<T>* channel_;
1084template <
typename T>
1088 std::optional<SendReservation<T>>> {
1090 using Base = internal::
1091 ChannelFuture<ReserveSendFuture, T, std::optional<SendReservation<T>>>;
1102 this->RemoveFromChannel();
1116 if (this->channel() ==
nullptr) {
1117 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1120 this->channel()->lock();
1121 if (!this->channel()->is_open_locked()) {
1123 return Ready<std::optional<SendReservation<T>>>(std::nullopt);
1126 if (this->channel()->remaining_capacity_locked() == 0) {
1127 this->StoreWakerForReserveSend(cx);
1131 this->channel()->add_reservation();
1139template <
typename T>
1142 constexpr Sender() : channel_(
nullptr) {}
1148 : channel_(std::exchange(other.channel_,
nullptr)) {}
1151 if (
this == &other) {
1154 if (channel_ !=
nullptr) {
1155 channel_->remove_sender();
1157 channel_ = std::exchange(other.channel_,
nullptr);
1162 if (channel_ !=
nullptr) {
1163 channel_->remove_sender();
1175 template <
typename U>
1197 if (channel_ ==
nullptr) {
1200 return channel_->TryReserveSend();
1213 if (channel_ ==
nullptr) {
1216 return channel_->TrySend(value);
1221 if (channel_ ==
nullptr) {
1224 return channel_->TrySend(std::move(value));
1241 return BlockingSendMoveOrCopy(dispatcher, value, timeout);
1249 return BlockingSendMoveOrCopy(dispatcher, std::move(value), timeout);
1257 if (channel_ !=
nullptr) {
1258 channel_->remove_sender();
1265 return channel_ !=
nullptr ? channel_->remaining_capacity() : 0;
1270 return channel_ !=
nullptr ? channel_->capacity() : 0;
1275 return channel_ !=
nullptr && channel_->is_open();
1279 template <
typename U>
1282 template <
typename U>
1283 friend std::optional<std::tuple<SpmcChannelHandle<U>,
Sender<U>>>
1286 template <
typename U, u
int16_t kCapacity>
1290 template <
typename U>
1294 template <
typename U, u
int16_t kCapacity>
1300 : channel_(&channel) {
1301 channel_->add_sender();
1304 template <
typename U>
1305 Status BlockingSendMoveOrCopy(Dispatcher& dispatcher,
1309 if (channel_ ==
nullptr) {
1313 if (Status status = channel_->TrySend(std::forward<U>(value));
1314 status.ok() || status.IsFailedPrecondition()) {
1318 return BlockingSendFuture(
1320 SendFuture<T>(channel_, std::forward<U>(value)),
1324 Status BlockingSendFuture(Dispatcher& dispatcher,
1325 SendFuture<T>&& future,
1329 sync::TimedThreadNotification notification;
1332 [&status, ¬ification](
bool result) {
1333 status = result ?
OkStatus() : Status::FailedPrecondition();
1334 notification.release();
1337 dispatcher.Post(task);
1339 if (timeout == internal::Channel<T>::kWaitForever) {
1340 notification.acquire();
1344 if (!notification.try_acquire_for(timeout)) {
1351 internal::Channel<T>* channel_;
1367template <
typename T>
1369 uint16_t capacity) {
1371 if (channel ==
nullptr) {
1372 return std::nullopt;
1374 std::lock_guard lock(*channel);
1389template <
typename T, u
int16_t kCapacity>
1392 PW_DASSERT(!storage.active_locked());
1409template <
typename T>
1413 if (channel ==
nullptr) {
1414 return std::nullopt;
1416 std::lock_guard lock(*channel);
1431template <
typename T, u
int16_t kCapacity>
1435 PW_DASSERT(!storage.active_locked());
1452template <
typename T>
1456 if (channel ==
nullptr) {
1457 return std::nullopt;
1459 std::lock_guard lock(*channel);
1474template <
typename T, u
int16_t kCapacity>
1478 PW_DASSERT(!storage.active_locked());
1495template <
typename T>
1496std::optional<std::tuple<SpscChannelHandle<T>, Sender<T>, Receiver<T>>>
1499 if (channel ==
nullptr) {
1500 return std::nullopt;
1502 std::lock_guard lock(*channel);
1519template <
typename T, u
int16_t kCapacity>
1523 PW_DASSERT(!storage.active_locked());
1524 return std::make_tuple(
1533 channel_->RemoveRefAndDestroyIfUnreferenced();
1537inline void BaseChannel::PopAndWakeOne(
1538 IntrusiveForwardList<BaseChannelFuture>& futures) {
1539 BaseChannelFuture& future = futures.front();
1540 futures.pop_front();
Definition: allocator.h:43
std::enable_if_t<!std::is_array_v< T >, T * > New(Args &&... args)
Definition: allocator.h:64
Abstract interface for releasing memory.
Definition: deallocator.h:29
Definition: intrusive_forward_list.h:99
static constexpr Status DeadlineExceeded()
Definition: status.h:177
static constexpr Status Unavailable()
Definition: status.h:304
static constexpr Status FailedPrecondition()
Definition: status.h:243
Definition: callback_task.h:44
Channel handle for a particular type T.
Definition: channel.h:531
Sender< T > CreateSender()
Definition: channel.h:548
Receiver< T > CreateReceiver()
Definition: channel.h:555
Definition: channel.h:736
bool active() const
Definition: channel.h:760
Definition: dispatcher.h:53
Definition: channel.h:691
Definition: channel.h:648
A handle to a multi-producer, multi-consumer channel.
Definition: channel.h:563
friend std::optional< MpmcChannelHandle< U > > CreateMpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1368
A handle to a multi-producer, single-consumer channel.
Definition: channel.h:585
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1410
Definition: channel.h:770
A receiver which reads values from an asynchronous channel.
Definition: channel.h:822
friend std::optional< std::tuple< MpscChannelHandle< U >, Receiver< U > > > CreateMpscChannel(Allocator &, uint16_t)
Definition: channel.h:1410
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:937
Result< T > TryReceive()
Definition: channel.h:918
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1497
Result< T > BlockingReceive(Dispatcher &dispatcher, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:858
void Disconnect()
Definition: channel.h:929
ReceiveFuture< T > Receive()
Definition: channel.h:908
Definition: channel.h:1088
Definition: channel.h:972
Definition: channel.h:1035
void Cancel()
Releases the reservation, making the space available for other senders.
Definition: channel.h:1063
void Commit(Args &&... args)
Commits a value to a reserved slot.
Definition: channel.h:1056
A sender which writes values to an asynchronous channel.
Definition: channel.h:1140
SendFuture< T > Send(U &&value)
Definition: channel.h:1176
Result< SendReservation< T > > TryReserveSend()
Definition: channel.h:1196
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1453
Status TrySend(const T &value)
Definition: channel.h:1212
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1497
ReserveSendFuture< T > ReserveSend()
Definition: channel.h:1185
uint16_t capacity() const
Returns the maximum capacity of the channel.
Definition: channel.h:1269
Status TrySend(T &&value)
Definition: channel.h:1220
uint16_t remaining_capacity() const
Returns the remaining capacity of the channel.
Definition: channel.h:1264
void Disconnect()
Definition: channel.h:1256
bool is_open() const
Returns true if the channel is open.
Definition: channel.h:1274
Status BlockingSend(Dispatcher &dispatcher, const T &value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1237
Status BlockingSend(Dispatcher &dispatcher, T &&value, chrono::SystemClock::duration timeout=internal::Channel< T >::kWaitForever)
Definition: channel.h:1245
A handle to a single-producer, multi-consumer channel.
Definition: channel.h:606
friend std::optional< std::tuple< SpmcChannelHandle< U >, Sender< U > > > CreateSpmcChannel(Allocator &, uint16_t)
Definition: channel.h:1453
A handle to a single-producer, single-consumer channel.
Definition: channel.h:627
friend std::optional< std::tuple< SpscChannelHandle< U >, Sender< U >, Receiver< U > > > CreateSpscChannel(Allocator &, uint16_t)
Definition: channel.h:1497
bool is_complete() const
True if the future has returned Ready().
Definition: channel.h:68
Definition: channel.h:477
Definition: channel.h:157
Definition: channel.h:124
Definition: channel.h:316
Definition: channel.h:449
constexpr size_type capacity() const noexcept
Returns the maximum number of elements in the deque.
Definition: generic_deque.h:74
Definition: interrupt_spin_lock.h:50
Definition: timed_thread_notification.h:40
bool try_acquire_for(chrono::SystemClock::duration timeout)
void Deallocate(void *ptr)
Definition: deallocator.h:60
std::optional< std::tuple< SpmcChannelHandle< T >, Sender< T > > > CreateSpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1453
std::optional< std::tuple< MpscChannelHandle< T >, Receiver< T > > > CreateMpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1410
std::optional< MpmcChannelHandle< T > > CreateMpmcChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1368
std::optional< std::tuple< SpscChannelHandle< T >, Sender< T >, Receiver< T > > > CreateSpscChannel(Allocator &alloc, uint16_t capacity)
Definition: channel.h:1497
constexpr bool IsReady() const noexcept
Returns whether or not this value is Ready.
Definition: poll.h:133
constexpr PendingType Pending()
Returns a value indicating that an operation was not yet able to complete.
Definition: poll.h:271
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:255
std::chrono::duration< rep, period > duration
Alias for durations representable with this clock.
Definition: system_clock.h:90
constexpr bool CheckedIncrement(T &base, Inc inc)
Definition: checked_arithmetic.h:118
constexpr Status OkStatus()
Definition: status.h:450
#define PW_LOCKABLE(name)
Definition: lock_annotations.h:208
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_NO_LOCK_SAFETY_ANALYSIS
Definition: lock_annotations.h:292
#define PW_EXCLUSIVE_LOCK_FUNCTION(...)
Definition: lock_annotations.h:230
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146
#define PW_LOCK_RETURNED(x)
Definition: lock_annotations.h:197
#define PW_UNLOCK_FUNCTION(...)
Definition: lock_annotations.h:247
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:176