C/C++ API Reference
Loading...
Searching...
No Matches
work_queue.h
1// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15#pragma once
16
17#include <cstdint>
18#include <mutex>
19
20#include "pw_assert/assert.h"
21#include "pw_containers/inline_queue.h"
22#include "pw_function/function.h"
23#include "pw_metric/metric.h"
24#include "pw_status/status.h"
25#include "pw_sync/interrupt_spin_lock.h"
26#include "pw_sync/lock_annotations.h"
27#include "pw_sync/thread_notification.h"
28#include "pw_thread/thread_core.h"
29
31namespace pw::work_queue {
32
34
38 protected:
39 // TODO: b/501206615 - The group and/or its name token should be passed as a
40 // ctor arg instead. Depending on the approach here the group should be
41 // exposed While doing this evaluate whether perhaps we should instead
42 // construct TypedMetric<uint32_t>s directly, avoiding the macro usage given
43 // the min_queue_remaining_ initial value requires dependency injection. And
44 // lastly when the restructure is finalized add unit tests to ensure these
45 // metrics work as intended.
46 PW_METRIC_GROUP(metrics_, "pw::work_queue::WorkQueue");
47 PW_METRIC(metrics_, max_queue_used_, "max_queue_used", 0u);
48 PW_METRIC(metrics_, min_queue_remaining_, "min_queue_remaining", 0u);
49};
50
67template <typename WorkItem>
68class CustomWorkQueue : public thread::ThreadCore,
70 public:
77 pw::Function<void(WorkItem&)>&& fn)
78 : stop_requested_(false), queue_(queue), fn_(std::move(fn)) {
79 min_queue_remaining_.Set(static_cast<uint32_t>(queue.capacity()));
80 }
81
92 Status PushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) {
93 return InternalPushWork(std::move(work_item));
94 }
95
109 void CheckPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) {
110 PW_ASSERT_OK(InternalPushWork(std::move(work_item)),
111 "Failed to push work item into the work queue");
112 }
113 void CheckPushWork(WorkItem& work_item) PW_LOCKS_EXCLUDED(lock_) {
114 PW_ASSERT_OK(InternalPushWork(std::move(work_item)),
115 "Failed to push work item into the work queue");
116 }
117
125 {
126 std::lock_guard lock(lock_);
127 stop_requested_ = true;
128 } // Release lock before calling .release() on the semaphore.
129 work_notification_.release();
130 }
131
136 void Clear() PW_LOCKS_EXCLUDED(lock_) {
137 std::lock_guard lock(lock_);
138 queue_.clear();
139 }
140
141 private:
142 void Run() override PW_LOCKS_EXCLUDED(lock_) {
143 while (true) {
144 work_notification_.acquire();
145
146 // Drain the work queue.
147 bool stop_requested;
148 bool work_remaining;
149 do {
150 std::optional<WorkItem> possible_work_item;
151 {
152 std::lock_guard lock(lock_);
153 if (!queue_.empty()) {
154 possible_work_item.emplace(std::move(queue_.front()));
155 queue_.pop();
156 }
157 work_remaining = !queue_.empty();
158 stop_requested = stop_requested_;
159 }
160 if (!possible_work_item.has_value()) {
161 continue; // No work item to process.
162 }
163 WorkItem& work_item = possible_work_item.value();
164 fn_(work_item);
165 } while (work_remaining);
166
167 // Queue was drained, return if we've been requested to stop.
168 if (stop_requested) {
169 return;
170 }
171 }
172 }
173
174 Status InternalPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) {
175 {
176 std::lock_guard lock(lock_);
177
178 if (stop_requested_) {
179 // Entries are not permitted to be enqueued once stop has been
180 // requested.
182 }
183
184 if (queue_.full()) {
186 }
187
188 queue_.emplace(std::move(work_item));
189
190 // Update the watermarks for the queue.
191 const uint32_t queue_entries = queue_.size();
192 if (queue_entries > max_queue_used_.value()) {
193 max_queue_used_.Set(queue_entries);
194 }
195 const uint32_t queue_remaining = queue_.capacity() - queue_entries;
196 if (queue_remaining < min_queue_remaining_.value()) {
197 min_queue_remaining_.Set(queue_entries);
198 }
199 } // Release lock before calling .release() on the semaphore.
200 work_notification_.release();
201 return OkStatus();
202 }
203 sync::InterruptSpinLock lock_;
204 bool stop_requested_ PW_GUARDED_BY(lock_);
205 InlineQueue<WorkItem>& queue_ PW_GUARDED_BY(lock_);
206 sync::ThreadNotification work_notification_;
207 pw::Function<void(WorkItem&)> fn_;
208};
209
214class WorkQueue : public CustomWorkQueue<Closure> {
215 public:
218 : CustomWorkQueue(queue, [](Closure& fn) { fn(); }) {}
219};
220
222
223namespace internal {
224
225// Storage base class for the WorkQueueWithBuffer classes. The queue must be a
226// base class instead of a member so the queue is initialized before it is
227// passed to the CustomWorkQueue base.
228template <typename WorkItem, size_t kWorkQueueEntries>
229struct Storage {
231};
232
233} // namespace internal
234
236
242template <size_t kWorkQueueEntries, typename WorkItem>
244 : private internal::Storage<WorkItem, kWorkQueueEntries>,
245 public CustomWorkQueue<WorkItem> {
246 public:
248 constexpr CustomWorkQueueWithBuffer(pw::Function<void(WorkItem&)>&& fn)
249 : CustomWorkQueue<WorkItem>(
250 internal::Storage<WorkItem, kWorkQueueEntries>::queue,
251 std::move(fn)) {}
252};
253
261template <size_t kWorkQueueEntries>
263 : private internal::Storage<Closure, kWorkQueueEntries>,
264 public WorkQueue {
265 public:
266 constexpr WorkQueueWithBuffer()
268};
269
270} // namespace pw::work_queue
Definition: inline_queue.h:55
Definition: status.h:120
static constexpr Status FailedPrecondition()
Definition: status.h:243
static constexpr Status ResourceExhausted()
Definition: status.h:230
Definition: work_queue.h:69
Definition: work_queue.h:37
Definition: work_queue.h:245
Definition: work_queue.h:214
Definition: work_queue.h:264
fit::function_impl< function_internal::config::kInlineCallableSize, !function_internal::config::kEnableDynamicAllocation, FunctionType, PW_FUNCTION_DEFAULT_ALLOCATOR_TYPE > Function
Definition: function.h:73
Function< void()> Closure
void-returning pw::Function that takes no arguments.
Definition: function.h:111
constexpr Status OkStatus()
Definition: status.h:450
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:178
void Clear()
Definition: work_queue.h:136
void CheckPushWork(WorkItem &&work_item)
Definition: work_queue.h:109
void RequestStop()
Definition: work_queue.h:124
Status PushWork(WorkItem &&work_item)
Definition: work_queue.h:92
CustomWorkQueue(InlineQueue< WorkItem > &queue, pw::Function< void(WorkItem &)> &&fn)
Definition: work_queue.h:76
WorkQueue(InlineQueue< Closure > &queue)
Definition: work_queue.h:217
constexpr CustomWorkQueueWithBuffer(pw::Function< void(WorkItem &)> &&fn)
Definition: work_queue.h:248
Work queue library for threads and interrupts.
Definition: work_queue.h:31
Definition: work_queue.h:229