C/C++ API Reference
Loading...
Searching...
No Matches
mpsc_stream.h
Go to the documentation of this file.
1// Copyright 2023 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
49
50#include <cstddef>
51
52#include "pw_bytes/span.h"
53#include "pw_chrono/system_clock.h"
54#include "pw_containers/intrusive_list.h"
55#include "pw_function/function.h"
56#include "pw_status/status.h"
57#include "pw_status/status_with_size.h"
58#include "pw_stream/stream.h"
59#include "pw_sync/lock_annotations.h"
60#include "pw_sync/mutex.h"
61#include "pw_sync/timed_thread_notification.h"
62
63namespace pw::stream {
64
65// Forward declaration.
66class MpscReader;
67class MpscWriter;
68
70
85void CreateMpscStream(MpscReader& reader, MpscWriter& writer);
86
88
90
102 public IntrusiveList<MpscWriter>::Item {
103 public:
104 using duration = std::optional<chrono::SystemClock::duration>;
105
111 struct Request : public IntrusiveList<Request>::Item {
113 using IntrusiveList<Request>::Item::unlisted;
114 };
115
116 MpscWriter() = default;
117 MpscWriter(const MpscWriter& other);
118 MpscWriter& operator=(const MpscWriter& other);
119 MpscWriter(MpscWriter&& other);
120 MpscWriter& operator=(MpscWriter&& other);
121 ~MpscWriter() override;
122
124 bool connected() const PW_LOCKS_EXCLUDED(mutex_);
125
127 size_t last_write() const PW_LOCKS_EXCLUDED(mutex_);
128
130 const duration& timeout() const PW_LOCKS_EXCLUDED(mutex_);
131
157 void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_);
158
170 void SetLimit(size_t limit) PW_LOCKS_EXCLUDED(mutex_);
171
175 void Close() PW_LOCKS_EXCLUDED(mutex_);
176
177 private:
178 // The factory method is allowed to directly modify a writer to connect it
179 // to the reader.
181
183 size_t ConservativeLimit(LimitType type) const override;
184
193
195 void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
196
197 mutable sync::Mutex mutex_;
198 MpscReader* reader_ PW_GUARDED_BY(mutex_) = nullptr;
199 size_t limit_ PW_GUARDED_BY(mutex_) = kUnlimited;
200 Request write_request_;
201 duration timeout_ PW_GUARDED_BY(mutex_);
202 size_t last_write_ PW_GUARDED_BY(mutex_) = 0;
203};
204
206
208
219 public:
220 using duration = std::optional<chrono::SystemClock::duration>;
221
222 MpscReader();
223 ~MpscReader() override;
224
226 bool connected() const PW_LOCKS_EXCLUDED(mutex_);
227
237 void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_);
238
251 void SetBuffer(ByteSpan buffer) PW_LOCKS_EXCLUDED(mutex_);
252
280 using ReadAllCallback = Function<Status(ConstByteSpan data)>;
281 Status ReadAll(ReadAllCallback callback) PW_LOCKS_EXCLUDED(mutex_);
282
284 void Close() PW_LOCKS_EXCLUDED(mutex_);
285
286 private:
287 // The factory method is allowed to directly modify the reader to connect it
288 // to a writer.
290
291 // The writer is allowed to call directly into the reader to:
292 // * Add/remove itself to the reader's list of writer.
293 // * Request space to write data, and to write to that space.
294 friend class MpscWriter;
295
303 void IncreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_);
304 void IncreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
305
313 void DecreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_);
314 void DecreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
315
317 size_t ConservativeLimit(Stream::LimitType type) const override
318 PW_LOCKS_EXCLUDED(mutex_);
319
325 void RequestWrite(MpscWriter::Request& write_request)
326 PW_LOCKS_EXCLUDED(mutex_);
327
334 void CheckWriteableLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
335
344 StatusWithSize WriteData(ConstByteSpan data, size_t limit)
345 PW_LOCKS_EXCLUDED(mutex_);
346
354 void CompleteWrite(MpscWriter::Request& write_request)
355 PW_LOCKS_EXCLUDED(mutex_);
356 void CompleteWriteLocked(MpscWriter::Request& write_request)
358
360 StatusWithSize DoRead(ByteSpan destination) override
361 PW_LOCKS_EXCLUDED(mutex_);
362
363 // Locked implementations.
364
365 mutable sync::Mutex mutex_;
366 IntrusiveList<MpscWriter> writers_ PW_GUARDED_BY(mutex_);
367 IntrusiveList<MpscWriter::Request> write_requests_ PW_GUARDED_BY(mutex_);
368 IntrusiveList<MpscWriter::Request>::iterator last_request_
369 PW_GUARDED_BY(mutex_);
370
371 size_t num_unlimited_ PW_GUARDED_BY(mutex_) = 0;
372 size_t limit_ PW_GUARDED_BY(mutex_) = 0;
373
374 bool reading_ PW_GUARDED_BY(mutex_) = false;
375 sync::TimedThreadNotification readable_;
376 sync::ThreadNotification closeable_;
377 duration timeout_ PW_GUARDED_BY(mutex_);
378
379 ByteSpan destination_ PW_GUARDED_BY(mutex_);
380 size_t written_ PW_GUARDED_BY(mutex_) = 0;
381
382 ByteSpan buffer_ PW_GUARDED_BY(mutex_);
383 size_t offset_ PW_GUARDED_BY(mutex_) = 0;
384 size_t length_ PW_GUARDED_BY(mutex_) = 0;
385};
386
392template <size_t kCapacity>
394 public:
395 BufferedMpscReader() { SetBuffer(buffer_); }
396
397 private:
398 std::array<std::byte, kCapacity> buffer_;
399};
400
402
403} // namespace pw::stream
Definition: status.h:86
Definition: status_with_size.h:49
Definition: intrusive_list.h:88
Definition: mpsc_stream.h:393
Definition: mpsc_stream.h:218
bool connected() const
Returns whether this object has any connected writers.
Definition: mpsc_stream.h:102
size_t ConservativeLimit(LimitType type) const override
void SetLimit(size_t limit)
size_t last_write() const
Indicates how much data was sent in the last call to Write().
void SetTimeout(const duration &timeout)
const duration & timeout() const
Returns the optional maximum time elapsed before a Write() fails.
bool connected() const
Returns whether this object is connected to a reader.
friend void CreateMpscStream(MpscReader &, MpscWriter &)
Status DoWrite(ConstByteSpan data) override
Virtual Write() function implemented by derived classes.
Definition: stream.h:413
Definition: stream.h:504
Definition: stream.h:45
static constexpr size_t kUnlimited
Value returned from read/write limit if unlimited.
Definition: stream.h:66
StatusWithSize DoRead(ByteSpan) final
Virtual Read() function implemented by derived classes.
Definition: stream.h:450
Definition: timed_thread_notification.h:40
fit::function_impl< function_internal::config::kInlineCallableSize, !function_internal::config::kEnableDynamicAllocation, FunctionType, PW_FUNCTION_DEFAULT_ALLOCATOR_TYPE > Function
Definition: function.h:73
void CreateMpscStream(MpscReader &reader, MpscWriter &writer)
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146
#define PW_LOCKS_EXCLUDED(...)
Definition: lock_annotations.h:176
Definition: mpsc_stream.h:111