20#include "pw_assert/assert.h"
21#include "pw_containers/inline_queue.h"
22#include "pw_function/function.h"
23#include "pw_metric/metric.h"
24#include "pw_status/status.h"
25#include "pw_sync/interrupt_spin_lock.h"
26#include "pw_sync/lock_annotations.h"
27#include "pw_sync/thread_notification.h"
28#include "pw_thread/thread_core.h"
30namespace pw::work_queue {
48template <
typename WorkItem>
58 : stop_requested_(false), queue_(queue), fn_(std::move(fn)) {
59 min_queue_remaining_.Set(
static_cast<uint32_t
>(queue.capacity()));
80 return InternalPushWork(std::move(work_item));
97 PW_ASSERT_OK(InternalPushWork(std::move(work_item)),
98 "Failed to push work item into the work queue");
100 void CheckPushWork(WorkItem& work_item) PW_LOCKS_EXCLUDED(lock_) {
101 PW_ASSERT_OK(InternalPushWork(std::move(work_item)),
102 "Failed to push work item into the work queue");
113 std::lock_guard lock(lock_);
114 stop_requested_ =
true;
120 void Run() override PW_LOCKS_EXCLUDED(lock_) {
128 std::optional<WorkItem> possible_work_item;
130 std::lock_guard lock(lock_);
131 if (!queue_.empty()) {
132 possible_work_item.emplace(std::move(queue_.front()));
135 work_remaining = !queue_.empty();
136 stop_requested = stop_requested_;
138 if (!possible_work_item.has_value()) {
141 WorkItem& work_item = possible_work_item.value();
143 }
while (work_remaining);
146 if (stop_requested) {
152 Status InternalPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) {
154 std::lock_guard lock(lock_);
156 if (stop_requested_) {
159 return Status::FailedPrecondition();
163 return Status::ResourceExhausted();
166 queue_.emplace(std::move(work_item));
169 const uint32_t queue_entries = queue_.size();
170 if (queue_entries > max_queue_used_.value()) {
171 max_queue_used_.Set(queue_entries);
173 const uint32_t queue_remaining = queue_.capacity() - queue_entries;
174 if (queue_remaining < min_queue_remaining_.value()) {
175 min_queue_remaining_.Set(queue_entries);
181 sync::InterruptSpinLock lock_;
182 bool stop_requested_ PW_GUARDED_BY(lock_);
183 InlineQueue<WorkItem>& queue_ PW_GUARDED_BY(lock_);
184 sync::ThreadNotification work_notification_;
194 PW_METRIC_GROUP(metrics_,
"pw::work_queue::WorkQueue");
195 PW_METRIC(metrics_, max_queue_used_,
"max_queue_used", 0u);
196 PW_METRIC(metrics_, min_queue_remaining_,
"min_queue_remaining", 0u);
215template <
typename WorkItem,
size_t kWorkQueueEntries>
227template <
size_t kWorkQueueEntries,
typename WorkItem>
235 internal::Storage<WorkItem, kWorkQueueEntries>::queue,
246template <
size_t kWorkQueueEntries>
Definition: inline_queue.h:50
Definition: work_queue.h:49
void CheckPushWork(WorkItem &&work_item)
Definition: work_queue.h:96
void RequestStop()
Definition: work_queue.h:111
Status PushWork(WorkItem &&work_item)
Definition: work_queue.h:79
CustomWorkQueue(InlineQueue< WorkItem > &queue, pw::Function< void(WorkItem &)> &&fn)
Definition: work_queue.h:56
Definition: work_queue.h:230
constexpr CustomWorkQueueWithBuffer(pw::Function< void(WorkItem &)> &&fn)
Definition: work_queue.h:233
Definition: work_queue.h:203
WorkQueue(InlineQueue< Closure > &queue)
Definition: work_queue.h:206
Definition: work_queue.h:249
fit::function_impl< function_internal::config::kInlineCallableSize, !function_internal::config::kEnableDynamicAllocation, FunctionType, PW_FUNCTION_DEFAULT_ALLOCATOR_TYPE > Function
Definition: function.h:74
Function< void()> Closure
void-returning pw::Function that takes no arguments.
Definition: function.h:112
constexpr Status OkStatus()
Definition: status.h:234
Definition: work_queue.h:216