C/C++ API Reference
Loading...
Searching...
No Matches
pw::stream::MpscReader Class Reference

Overview

Reader of a multi-producer, single-consumer stream.

The reader manages 3 aspects of the stream:

  • The storage used to hold written data that is to be read.
  • The list of connected writers.
  • Accounting for how much data has and can be written.

This class has a default constructor that can only produce a disconnected reader. To connect a reader, use CreateMpscStream().

Inheritance diagram for pw::stream::MpscReader:
pw::stream::NonSeekableReader pw::stream::Reader pw::stream::Stream pw::stream::BufferedMpscReader< kCapacity >

Public Types

using duration = std::optional< chrono::SystemClock::duration >
 
using ReadAllCallback = Function< Status(ConstByteSpan data)>
 
- Public Types inherited from pw::stream::Stream
enum  Whence : uint8_t { kBeginning = 0b001 , kCurrent = 0b010 , kEnd = 0b100 }
 Positions from which to seek. More...
 

Public Member Functions

bool connected () const
 Returns whether this object has any connected writers.
 
void SetTimeout (const duration &timeout)
 
void SetBuffer (ByteSpan buffer)
 
Status ReadAll (ReadAllCallback callback)
 
void Close ()
 Disconnects all writers and drops any unread data.
 
- Public Member Functions inherited from pw::stream::Stream
constexpr bool readable () const
 
constexpr bool writable () const
 
constexpr bool seekable () const
 
constexpr bool seekable (Whence origin) const
 True if the stream supports seeking from the specified origin.
 
Result< ByteSpanRead (ByteSpan dest)
 
Result< ByteSpanRead (void *dest, size_t size_bytes)
 This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
 
Result< ByteSpanReadExact (ByteSpan const buffer)
 
Status Write (ConstByteSpan data)
 
Status Write (const void *data, size_t size_bytes)
 This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
 
Status Write (const std::byte b)
 This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
 
Status Seek (ptrdiff_t offset, Whence origin=kBeginning)
 
size_t Tell ()
 
size_t ConservativeReadLimit () const
 
size_t ConservativeWriteLimit () const
 

Private Member Functions

size_t ConservativeLimit (Stream::LimitType type) const override
 
StatusWithSize DoRead (ByteSpan destination) override
 Virtual Read() function implemented by derived classes.
 

Friends

class MpscWriter
 
void CreateMpscStream (MpscReader &, MpscWriter &)
 

Additional Inherited Members

- Static Public Attributes inherited from pw::stream::Stream
static constexpr size_t kUnlimited = std::numeric_limits<size_t>::max()
 Value returned from read/write limit if unlimited.
 
static constexpr size_t kUnknownPosition = std::numeric_limits<size_t>::max()
 Returned by Tell() if getting the position is not supported.
 
- Protected Types inherited from pw::stream::Stream
enum class  LimitType : bool { kRead , kWrite }
 

Member Function Documentation

◆ ConservativeLimit()

size_t pw::stream::MpscReader::ConservativeLimit ( Stream::LimitType  type) const
overrideprivatevirtual

Virtual function optionally implemented by derived classes that is used for ConservativeReadLimit() and ConservativeWriteLimit().

The default implementation returns kUnlimited or 0 depending on whether the stream is readable/writable.

Reimplemented from pw::stream::Stream.

◆ DoRead()

StatusWithSize pw::stream::MpscReader::DoRead ( ByteSpan  destination)
overrideprivatevirtual

Virtual Read() function implemented by derived classes.

Implements pw::stream::Stream.

◆ ReadAll()

pw::stream::MpscReader::ReadAll ( ReadAllCallback  callback)

Reads data in a loop and passes it to a provided callback.

This will read continuously until all connected writers close.

Example usage:

(.cpp}
MpscReader reader;
MpscWriter writer;
MpscStreamCreate(reader, writer);
Thread t(MakeThreadOptions(), [] (void*arg) {
auto *writer = static_cast<MpscWriter *>(arg);
writer->Write(GenerateSomeData()).IgnoreError();
}, &writer);
auto status = reader.ReadAll([] (ConstByteSpan data) {
return ProcessSomeData();
});
t.join();
constexpr void IgnoreError() const
Definition: status.h:223
Definition: mpsc_stream.h:218
Status ReadAll(ReadAllCallback callback)
Definition: mpsc_stream.h:102
Status Write(ConstByteSpan data)
Definition: stream.h:192
Definition: thread.h:69
Parameters
[in]callbackA callable object to invoke on data as it is read.
Return values
OKSuccessfully read until writers closed.
FAILED_PRECONDITIONThe object does not have a buffer.
RESOURCE_EXHAUSTEDTimed out when reading data. This can only occur if a timeout has been set.
Anyother error as returned by the callback.

◆ SetBuffer()

void pw::stream::MpscReader::SetBuffer ( ByteSpan  buffer)

Associates the reader with storage to buffer written data to be read.

If desired, callers can use this method to buffer written data. This can improve writer performance by allowing calls to WriteData() to avoid waiting for the reader, albeit at the cost of increased memory. This can be useful when the reader needs time to process the data it reads, or when the volume of writes varies over time, i.e. is "bursty".

The reader does not take ownership of the storage, which must be valid until a call to the destructor or another call to SetBuffer().

Parameters
[in]bufferA view to the storage.

◆ SetTimeout()

void pw::stream::MpscReader::SetTimeout ( const duration &  timeout)

Set the timeout for reading from this stream.

After setting a timeout, if the given duration elapses while making a call to Read(), RESOURCE_EXHAUSTED will be returned. If desired, a timeout should be set before calling Read() or ReadAll(). Setting a timeout when a reader is awaiting notification from a writer will not affect the duration of that wait. ReadUntilClose() ignores timeouts entirely.

Parameters
[in]timeoutThe duration to wait before returning an error.

Friends And Related Function Documentation

◆ CreateMpscStream

void CreateMpscStream ( MpscReader ,
MpscWriter  
)
friend

Creates a multi-producer, single consumer stream.

This method creates a stream by associating a reader and writer. Both are reset before being connected. This is the only way to connect a reader. Additional writers may be connected by copying the given writer after it is connected.

This method is thread-safe with respect to other MpscReader and MpscWriter methods. It is not thread-safe with respect to itself, i.e. callers must not make concurrent calls to CreateMpscStream() from different threads with the same objects.

Parameters
[out]readerThe reader to connect.
[out]writerThe writer to connect.

The documentation for this class was generated from the following file: