Pigweed
C/C++ API Reference
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Loading...
Searching...
No Matches
work_queue.h
1// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15#pragma once
16
17#include <cstdint>
18#include <mutex>
19
20#include "pw_assert/assert.h"
21#include "pw_containers/inline_queue.h"
22#include "pw_function/function.h"
23#include "pw_metric/metric.h"
24#include "pw_status/status.h"
25#include "pw_sync/interrupt_spin_lock.h"
26#include "pw_sync/lock_annotations.h"
27#include "pw_sync/thread_notification.h"
28#include "pw_thread/thread_core.h"
29
30namespace pw::work_queue {
31
48template <typename WorkItem>
49class CustomWorkQueue : public thread::ThreadCore {
50 public:
57 pw::Function<void(WorkItem&)>&& fn)
58 : stop_requested_(false), queue_(queue), fn_(std::move(fn)) {
59 min_queue_remaining_.Set(static_cast<uint32_t>(queue.capacity()));
60 }
61
79 Status PushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) {
80 return InternalPushWork(std::move(work_item));
81 }
82
96 void CheckPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) {
97 PW_ASSERT_OK(InternalPushWork(std::move(work_item)),
98 "Failed to push work item into the work queue");
99 }
100 void CheckPushWork(WorkItem& work_item) PW_LOCKS_EXCLUDED(lock_) {
101 PW_ASSERT_OK(InternalPushWork(std::move(work_item)),
102 "Failed to push work item into the work queue");
103 }
104
111 void RequestStop() PW_LOCKS_EXCLUDED(lock_) {
112 {
113 std::lock_guard lock(lock_);
114 stop_requested_ = true;
115 } // Release lock before calling .release() on the semaphore.
116 work_notification_.release();
117 }
118
119 private:
120 void Run() override PW_LOCKS_EXCLUDED(lock_) {
121 while (true) {
122 work_notification_.acquire();
123
124 // Drain the work queue.
125 bool stop_requested;
126 bool work_remaining;
127 do {
128 std::optional<WorkItem> possible_work_item;
129 {
130 std::lock_guard lock(lock_);
131 if (!queue_.empty()) {
132 possible_work_item.emplace(std::move(queue_.front()));
133 queue_.pop();
134 }
135 work_remaining = !queue_.empty();
136 stop_requested = stop_requested_;
137 }
138 if (!possible_work_item.has_value()) {
139 continue; // No work item to process.
140 }
141 WorkItem& work_item = possible_work_item.value();
142 fn_(work_item);
143 } while (work_remaining);
144
145 // Queue was drained, return if we've been requested to stop.
146 if (stop_requested) {
147 return;
148 }
149 }
150 }
151
152 Status InternalPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) {
153 {
154 std::lock_guard lock(lock_);
155
156 if (stop_requested_) {
157 // Entries are not permitted to be enqueued once stop has been
158 // requested.
159 return Status::FailedPrecondition();
160 }
161
162 if (queue_.full()) {
163 return Status::ResourceExhausted();
164 }
165
166 queue_.emplace(std::move(work_item));
167
168 // Update the watermarks for the queue.
169 const uint32_t queue_entries = queue_.size();
170 if (queue_entries > max_queue_used_.value()) {
171 max_queue_used_.Set(queue_entries);
172 }
173 const uint32_t queue_remaining = queue_.capacity() - queue_entries;
174 if (queue_remaining < min_queue_remaining_.value()) {
175 min_queue_remaining_.Set(queue_entries);
176 }
177 } // Release lock before calling .release() on the semaphore.
178 work_notification_.release();
179 return OkStatus();
180 }
181 sync::InterruptSpinLock lock_;
182 bool stop_requested_ PW_GUARDED_BY(lock_);
183 InlineQueue<WorkItem>& queue_ PW_GUARDED_BY(lock_);
184 sync::ThreadNotification work_notification_;
185 pw::Function<void(WorkItem&)> fn_;
186
187 // TODO(ewout): The group and/or its name token should be passed as a ctor
188 // arg instead. Depending on the approach here the group should be exposed
189 // While doing this evaluate whether perhaps we should instead construct
190 // TypedMetric<uint32_t>s directly, avoiding the macro usage given the
191 // min_queue_remaining_ initial value requires dependency injection.
192 // And lastly when the restructure is finalized add unit tests to ensure these
193 // metrics work as intended.
194 PW_METRIC_GROUP(metrics_, "pw::work_queue::WorkQueue");
195 PW_METRIC(metrics_, max_queue_used_, "max_queue_used", 0u);
196 PW_METRIC(metrics_, min_queue_remaining_, "min_queue_remaining", 0u);
197};
198
203class WorkQueue : public CustomWorkQueue<Closure> {
204 public:
207 : CustomWorkQueue(queue, [](Closure& fn) { fn(); }) {}
208};
209
210namespace internal {
211
212// Storage base class for the WorkQueueWithBuffer classes. The queue must be a
213// base class instead of a member so the queue is initialized before it is
214// passed to the CustomWorkQueue base.
215template <typename WorkItem, size_t kWorkQueueEntries>
216struct Storage {
218};
219
220} // namespace internal
221
227template <size_t kWorkQueueEntries, typename WorkItem>
229 : private internal::Storage<WorkItem, kWorkQueueEntries>,
230 public CustomWorkQueue<WorkItem> {
231 public:
233 constexpr CustomWorkQueueWithBuffer(pw::Function<void(WorkItem&)>&& fn)
234 : CustomWorkQueue<WorkItem>(
235 internal::Storage<WorkItem, kWorkQueueEntries>::queue,
236 std::move(fn)) {}
237};
238
246template <size_t kWorkQueueEntries>
248 : private internal::Storage<Closure, kWorkQueueEntries>,
249 public WorkQueue {
250 public:
251 constexpr WorkQueueWithBuffer()
253};
254
255} // namespace pw::work_queue
Definition: inline_queue.h:50
Definition: status.h:85
Definition: work_queue.h:49
void CheckPushWork(WorkItem &&work_item)
Definition: work_queue.h:96
void RequestStop()
Definition: work_queue.h:111
Status PushWork(WorkItem &&work_item)
Definition: work_queue.h:79
CustomWorkQueue(InlineQueue< WorkItem > &queue, pw::Function< void(WorkItem &)> &&fn)
Definition: work_queue.h:56
Definition: work_queue.h:230
constexpr CustomWorkQueueWithBuffer(pw::Function< void(WorkItem &)> &&fn)
Definition: work_queue.h:233
Definition: work_queue.h:203
WorkQueue(InlineQueue< Closure > &queue)
Definition: work_queue.h:206
Definition: work_queue.h:249
fit::function_impl< function_internal::config::kInlineCallableSize, !function_internal::config::kEnableDynamicAllocation, FunctionType, PW_FUNCTION_DEFAULT_ALLOCATOR_TYPE > Function
Definition: function.h:74
Function< void()> Closure
void-returning pw::Function that takes no arguments.
Definition: function.h:112
constexpr Status OkStatus()
Definition: status.h:234
Definition: work_queue.h:216