pw_stream#
pw_stream
provides a foundational interface for streaming data from one part
of a system to another. In the simplest use cases, this is basically a memcpy
behind a reusable interface that can be passed around the system. On the other
hand, the flexibility of this interface means a pw_stream
could terminate is
something more complex, like a UART stream or flash memory.
Overview#
At the most basic level, pw_stream
’s interfaces provide very simple handles
to enabling streaming data from one location in a system to an endpoint.
Example:
Status DumpSensorData(pw::stream::Writer& writer) {
static char temp[64];
ImuSample imu_sample;
imu.GetSample(&info);
size_t bytes_written = imu_sample.AsCsv(temp, sizeof(temp));
return writer.Write(temp, bytes_written);
}
In this example, DumpSensorData()
only cares that it has access to a
Writer
that it can use to stream data to using Writer::Write()
.
The Writer
itself can be backed by anything that can act as a data
“sink.”
pw::stream Interfaces#
There are three basic capabilities of a stream:
Reading – Bytes can be read from the stream.
Writing – Bytes can be written to the stream.
Seeking – The position in the stream can be changed.
pw_stream
provides a family of stream classes with different capabilities.
The most basic class, Stream
guarantees no functionality, while the
most capable class, SeekableReaderWriter
supports reading, writing,
and seeking.
Usage overview#
pw::stream Interfaces |
Accept in APIs? |
Extend to create new stream? |
---|---|---|
❌ |
❌ |
|
✅ |
❌ |
|
✅ |
✅ |
|
✅ (rarely) |
✅ |
|
❌ |
✅ |
Interface documentation#
Summary documentation for the pw_stream
interfaces is below. See the API
comments in pw_stream/public/pw_stream/stream.h
for full details.
-
class Stream#
A generic stream that may support reading, writing, and seeking, but makes no guarantees about whether any operations are supported. Unsupported functions return Status::Unimplemented() Stream serves as the base for the Reader, Writer, and ReaderWriter interfaces.
Stream cannot be extended directly. Instead, work with one of the derived classes that explicitly supports the required functionality. Stream should almost never be used in APIs; accept a derived class with the required capabilities instead.
All Stream methods are blocking. They return when the requested operation completes.
Subclassed by pw::stream::Reader, pw::stream::ReaderWriter, pw::stream::Writer
Public Types
-
enum Whence#
Positions from which to seek.
Values:
-
enumerator kBeginning#
Seek from the beginning of the stream. The offset is a direct offset into the data.
-
enumerator kCurrent#
Seek from the current position in the stream. The offset is added to the current position. Use a negative offset to seek backwards.
Implementations may only support seeking within a limited range from the current position.
-
enumerator kEnd#
Seek from the end of the stream. The offset is added to the end position. Use a negative offset to seek backwards from the end.
-
enumerator kBeginning#
Public Functions
-
inline constexpr bool readable() const#
- Return values:
True – If reading is supported.
False – If Read() returns UNIMPLEMENTED.
-
inline constexpr bool writable() const#
- Return values:
True – If writing is supported.
False – If Write() returns UNIMPLEMENTED.
-
inline constexpr bool seekable() const#
- Return values:
True – If the stream supports seeking.
False – If the stream does not supports seeking.
-
inline constexpr bool seekable(Whence origin) const#
True if the stream supports seeking from the specified origin.
-
inline Result<ByteSpan> Read(ByteSpan dest)#
Reads data from the stream into the provided buffer, if supported. As many bytes as are available up to the buffer size are copied into the buffer. Remaining bytes may by read in subsequent calls. If any number of bytes are read returns OK with a span of the bytes read.
If the reader has been exhausted and is and can no longer read additional bytes it will return
OUT_OF_RANGE
. This is similar to end-of-file (EOF). Read will only returnOUT_OF_RANGE
if ConservativeReadLimit() is and will remain zero. A Read operation that is successful and also exhausts the reader returns OK, with all following calls returningOUT_OF_RANGE
.Derived classes should NOT try to override these public read methods. Instead, provide an implementation by overriding DoRead().
- Returns:
Code
Description
Between 1 and
dest.size_bytes()
were successfully read. Returns the span of read bytes.This stream does not support reading.
The Reader is not in state to read data.
Unable to read any bytes at this time. No bytes read. Try again once bytes become available.
Reader has been exhausted, similar to EOF. No bytes were read, no more will be read.
-
inline Result<ByteSpan> Read(void *dest, size_t size_bytes)#
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
-
inline Result<ByteSpan> ReadExact(ByteSpan const buffer)#
Reads exactly the number of bytes requested into the provided buffer, if supported. Internally, the stream is read as many times as necessary to fill the destination buffer. If fewer bytes than requested are available,
OUT_OF_RANGE
is returned.Other errors may be returned, as documented on Read().
-
inline Status Write(ConstByteSpan data)#
Writes data to this stream. Data is not guaranteed to be fully written out to final resting place on Write return.
If the writer is unable to fully accept the input data size it will abort the write and return
RESOURCE_EXHAUSTED
.If the writer has been exhausted and is and can no longer accept additional bytes it will return
OUT_OF_RANGE
. This is similar to end-of-file (EOF). Write will only returnOUT_OF_RANGE
if ConservativeWriteLimit() is and will remain zero. A Write operation that is successful and also exhausts the writer returns OK, with all following calls returningOUT_OF_RANGE
. When ConservativeWriteLimit() is greater than zero, a Write that is a number of bytes beyond what will exhaust the Write will abort and returnRESOURCE_EXHAUSTED
rather than OUT_OF_RANGE because the writer is still able to write bytes.Derived classes should NOT try to override the public Write methods. Instead, provide an implementation by overriding DoWrite().
- Returns:
Code
Description
Data was successfully accepted by the stream.
This stream does not support writing.
The writer is not in a state to accept data.
The writer was unable to write all of requested data at this time. No data was written.
The Writer has been exhausted, similar to EOF. No data was written; no more will be written.
-
inline Status Write(const void *data, size_t size_bytes)#
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
-
inline Status Write(const std::byte b)#
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
-
inline Status Seek(ptrdiff_t offset, Whence origin = kBeginning)#
Changes the current position in the stream for both reading and writing, if supported.
Seeking to a negative offset is invalid. The behavior when seeking beyond the end of a stream is determined by the implementation. The implementation could fail with OUT_OF_RANGE or append bytes to the stream.
- Returns:
Code
Description
Successfully updated the position.
Seeking is not supported for this stream.
Attempted to seek beyond the bounds of the stream. The position is unchanged.
-
inline size_t Tell()#
Returns the current position in the stream, if supported. The position is the offset from the beginning of the stream. Returns Stream::kUnknownPosition (
size_t(-1)
) if the position is unknown.Streams that support seeking from the beginning always support Tell(). Other streams may or may not support Tell().
-
inline size_t ConservativeReadLimit() const#
Liklely (not guaranteed) minimum bytes available to read at this time. This number is advisory and not guaranteed to read full number of requested bytes or without a
RESOURCE_EXHAUSTED
orOUT_OF_RANGE
. As Reader processes/handles/receives enqueued data or other contexts read data this number can go up or down for some Readers.- Return values:
zero – if, in the current state, Read() would not return OkStatus().
kUnlimited – if the implementation imposes no limits on read sizes.
-
inline size_t ConservativeWriteLimit() const#
Likely (not guaranteed) minimum bytes available to write at this time. This number is advisory and not guaranteed to write without a
RESOURCE_EXHAUSTED
orOUT_OF_RANGE
. As Writer processes/handles enqueued of other contexts write data this number can go up or down for some Writers. Returns zero if, in the current state, Write() would not return OkStatus().Returns kUnlimited if the implementation has no limits on write sizes.
Public Static Attributes
-
static constexpr size_t kUnlimited = std::numeric_limits<size_t>::max()#
Value returned from read/write limit if unlimited.
Private Types
-
enum class Seekability : uint8_t#
Seekability expresses the origins from which the stream always supports seeking. Seeking from other origins may work, but is not guaranteed.
Seekability is implemented as a bitfield of Whence values.
Values:
-
enumerator kNone#
No type of seeking is supported.
-
enumerator kRelative#
Seeking from the current position is supported, but the range may be limited. For example, a buffered stream might support seeking within the buffered data, but not before or after.
-
enumerator kAbsolute#
The stream supports random access anywhere within the stream.
-
enumerator kNone#
Private Functions
-
virtual StatusWithSize DoRead(ByteSpan destination) = 0#
Virtual Read() function implemented by derived classes.
-
virtual Status DoWrite(ConstByteSpan data) = 0#
Virtual Write() function implemented by derived classes.
-
virtual Status DoSeek(ptrdiff_t offset, Whence origin) = 0#
Virtual Seek() function implemented by derived classes.
-
inline virtual size_t DoTell()#
Virtual Tell() function optionally implemented by derived classes. The default implementation always returns kUnknownPosition.
-
inline virtual size_t ConservativeLimit(LimitType limit_type) const#
Virtual function optionally implemented by derived classes that is used for ConservativeReadLimit() and ConservativeWriteLimit().
The default implementation returns kUnlimited or
0
depending on whether the stream is readable/writable.
-
enum Whence#
Reader interfaces#
-
class Reader : public pw::stream::Stream#
A Stream that supports reading but not writing. The Write() method is hidden.
Use in APIs when:
Must read from, but not write to, a stream.
May or may not need seeking. Use a SeekableReader& if seeking is required.
Inherit from when:
Reader cannot be extended directly. Instead, extend SeekableReader, NonSeekableReader, or (rarely) RelativeSeekableReader, as appropriate.
A Reader may or may not support seeking. Check seekable() or try calling Seek() to determine if the stream is seekable.
Subclassed by pw::stream::NonSeekableReader, pw::stream::RelativeSeekableReader
-
class SeekableReader : public pw::stream::RelativeSeekableReader#
A Reader that fully supports seeking.
Use in APIs when:
Absolute seeking is required. Use Reader& if seeking is not required or seek failures can be handled gracefully.
Inherit from when:
Implementing a reader that supports absolute seeking.
-
class RelativeSeekableReader : public pw::stream::Reader#
A Reader that supports at least relative seeking within some range of the current position. Seeking beyond that or from other origins may or may not be supported. The extent to which seeking is possible is NOT exposed by this API.
Use in APIs when:
Relative seeking is required. Usage in APIs should be rare; generally Reader should be used instead.
Inherit from when:
Implementing a Reader that can only support seeking near the current position.
A buffered Reader that only supports seeking within its buffer is a good example of a RelativeSeekableReader.
Subclassed by pw::stream::SeekableReader
Writer interfaces#
-
class Writer : public pw::stream::Stream#
A Stream that supports writing but not reading. The Read() method is hidden.
Use in APIs when:
Must write to, but not read from, a stream.
May or may not need seeking. Use a SeekableWriter& if seeking is required.
Inherit from when:
Writer cannot be extended directly. Instead, extend SeekableWriter, NonSeekableWriter, or (rarely) RelativeSeekableWriter, as appropriate.
A Writer may or may not support seeking. Check seekable() or try calling Seek() to determine if the stream is seekable.
Subclassed by pw::stream::NonSeekableWriter, pw::stream::RelativeSeekableWriter
-
class SeekableWriter : public pw::stream::RelativeSeekableWriter#
A Writer that fully supports seeking.
Use in APIs when:
Absolute seeking is required. Use Writer& if seeking is not required or seek failures can be handled gracefully.
Inherit from when:
Implementing a writer that supports absolute seeking.
-
class RelativeSeekableWriter : public pw::stream::Writer#
A Writer that supports at least relative seeking within some range of the current position. Seeking beyond that or from other origins may or may not be supported. The extent to which seeking is possible is NOT exposed by this API.
Use in APIs when:
Relative seeking is required. Usage in APIs should be rare; generally Writer should be used instead.
Inherit from when:
Implementing a Writer that can only support seeking near the current position.
A buffered Writer that only supports seeking within its buffer is a good example of a RelativeSeekableWriter.
Subclassed by pw::stream::SeekableWriter
ReaderWriter interfaces#
-
class ReaderWriter : public pw::stream::Stream#
A Stream that supports both reading and writing.
Use in APIs when:
Must both read from and write to a stream.
May or may not need seeking. Use a SeekableReaderWriter& if seeking is required.
Inherit from when:
Cannot extend ReaderWriter directly. Instead, extend SeekableReaderWriter, NonSeekableReaderWriter, or (rarely) RelativeSeekableReaderWriter, as appropriate.
A ReaderWriter may or may not support seeking. Check seekable() or try calling Seek() to determine if the stream is seekable.
Subclassed by pw::stream::NonSeekableReaderWriter, pw::stream::RelativeSeekableReaderWriter
-
class SeekableReaderWriter : public pw::stream::RelativeSeekableReaderWriter#
A ReaderWriter that fully supports seeking.
Use in APIs when:
Absolute seeking is required. Use ReaderWriter& if seeking is not required or seek failures can be handled gracefully.
Inherit from when:
Implementing a writer that supports absolute seeking.
Subclassed by pw::multibuf::Stream
-
class RelativeSeekableReaderWriter : public pw::stream::ReaderWriter#
A ReaderWriter that supports at least relative seeking within some range of the current position. Seeking beyond that or from other origins may or may not be supported. The extent to which seeking is possible is NOT exposed by this API.
Use in APIs when:
Relative seeking is required. Usage in APIs should be rare; generally ReaderWriter should be used instead.
Inherit from when:
Implementing a ReaderWriter that can only support seeking near the current position.
A buffered ReaderWriter that only supports seeking within its buffer is a good example of a RelativeSeekableReaderWriter.
Subclassed by pw::stream::SeekableReaderWriter
-
class NonSeekableReaderWriter : public pw::stream::ReaderWriter#
A ReaderWriter that does not support seeking. The Seek() method is hidden.
Use in APIs when:
Do NOT use in APIs! If seeking is not required, use ReaderWriter& instead.
Inherit from when:
Implementing a ReaderWriter that does not support seeking.
Subclassed by pw::stream::UartStreamLinux, pw::uart::UartStream
Implementations#
pw_stream
includes a few stream implementations for general use.
-
class MemoryWriter : public SeekableWriter#
The
MemoryWriter
class implements theWriter
interface by backing the data destination with an externally-provided memory buffer.MemoryWriterBuffer
extendsMemoryWriter
to internally provide a memory buffer.The
MemoryWriter
can be accessed like a standard C++ container. The contents grow as data is written.
-
class MemoryReader : public SeekableReader#
The
MemoryReader
class implements theReader
interface by backing the data source with an externally-provided memory buffer.
-
class NullStream : public SeekableReaderWriter#
NullStream
is a no-op stream implementation, similar to/dev/null
. Writes are always dropped. Reads always returnOUT_OF_RANGE
. Seeks have no effect.
-
class CountingNullStream : public SeekableReaderWriter#
CountingNullStream
is a no-op stream implementation, likeNullStream
, that counts the number of bytes written.-
size_t bytes_written() const#
Returns the number of bytes provided to previous
Write()
calls.
-
size_t bytes_written() const#
-
class StdFileWriter : public SeekableWriter#
StdFileWriter
wraps anstd::ofstream
with theWriter
interface.
-
class StdFileReader : public SeekableReader#
StdFileReader
wraps anstd::ifstream
with theReader
interface.
-
class SocketStream : public NonSeekableReaderWriter#
SocketStream
wraps posix-style TCP sockets with theReader
andWriter
interfaces. It can be used to connect to a TCP server, or to communicate with a client via theServerSocket
class.
-
class ServerSocket#
ServerSocket
wraps a posix server socket, and produces aSocketStream
for each accepted client connection.
Why use pw_stream?#
Standard API#
pw_stream
provides a standard way for classes to express that they have the
ability to write data. Writing to one sink versus another sink is a matter of
just passing a reference to the appropriate Writer
.
As an example, imagine dumping sensor data. If written against a random HAL
or one-off class, there’s porting work required to write to a different sink
(imagine writing over UART vs dumping to flash memory). Building a “dumping”
implementation against the Writer
interface prevents a dependency
on a bespoke API that would require porting work.
Similarly, after building a Writer
implementation for a Sink that
data could be dumped to, that same Writer
can be reused for other
contexts that already write data to the pw::stream::Writer
interface.
Before:
// Not reusable, depends on `Uart`.
void DumpSensorData(Uart& uart) {
static char temp[64];
ImuSample imu_sample;
imu.GetSample(&info);
size_t bytes_written = imu_sample.AsCsv(temp, sizeof(temp));
uart.Transmit(temp, bytes_written, /*timeout_ms=*/ 200);
}
After:
// Reusable; no more Uart dependency!
Status DumpSensorData(Writer& writer) {
static char temp[64];
ImuSample imu_sample;
imu.GetSample(&info);
size_t bytes_written = imu_sample.AsCsv(temp, sizeof(temp));
return writer.Write(temp, bytes_written);
}
Reduce intermediate buffers#
Often functions that write larger blobs of data request a buffer is passed as the destination that data should be written to. This requires a buffer to be allocated, even if the data only exists in that buffer for a very short period of time before it’s written somewhere else.
In situations where data read from somewhere will immediately be written
somewhere else, a Writer
interface can cut out the middleman
buffer.
Before:
// Requires an intermediate buffer to write the data as CSV.
void DumpSensorData(Uart& uart) {
char temp[64];
ImuSample imu_sample;
imu.GetSample(&info);
size_t bytes_written = imu_sample.AsCsv(temp, sizeof(temp));
uart.Transmit(temp, bytes_written, /*timeout_ms=*/ 200);
}
After:
// Both DumpSensorData() and RawSample::AsCsv() use a Writer, eliminating the
// need for an intermediate buffer.
Status DumpSensorData(Writer& writer) {
RawSample imu_sample;
imu.GetSample(&info);
return imu_sample.AsCsv(writer);
}
Prevent buffer overflow#
When copying data from one buffer to another, there must be checks to ensure the
copy does not overflow the destination buffer. As this sort of logic is
duplicated throughout a codebase, there’s more opportunities for bound-checking
bugs to sneak in. Writers
manage this logic internally rather than pushing
the bounds checking to the code that is moving or writing the data.
Similarly, since only the Writer
has access to any underlying
buffers, it’s harder for functions that share a Writer
to
accidentally clobber data written by others using the same buffer.
Before:
Status BuildPacket(Id dest, span<const std::byte> payload,
span<std::byte> dest) {
Header header;
if (dest.size_bytes() + payload.size_bytes() < sizeof(Header)) {
return Status::ResourceExhausted();
}
header.dest = dest;
header.src = DeviceId();
header.payload_size = payload.size_bytes();
memcpy(dest.data(), &header, sizeof(header));
// Forgetting this line would clobber buffer contents. Also, using
// a temporary span instead could leave `dest` to be misused elsewhere in
// the function.
dest = dest.subspan(sizeof(header));
memcpy(dest.data(), payload.data(), payload.size_bytes());
}
After:
Status BuildPacket(Id dest, span<const std::byte> payload, Writer& writer) {
Header header;
header.dest = dest;
header.src = DeviceId();
header.payload_size = payload.size_bytes();
writer.Write(header);
return writer.Write(payload);
}
Design notes#
Sync & Flush#
The pw::stream::Stream
API does not include Sync()
or
Flush()
functions. There no mechanism in the Stream
API to
synchronize a Reader
’s potentially buffered input with its
underlying data source. This must be handled by the implementation if required.
Similarly, the Writer
implementation is responsible for flushing
any buffered data to the sink.
Flush()
and Sync()
were excluded from Stream
for a few
reasons:
The semantics of when to call
Flush()
/Sync()
on the stream are unclear. The presence of these methods complicates using aReader
orWriter
.Adding one or two additional virtual calls increases the size of all
Stream
vtables.
Class hierarchy#
All pw_stream
classes inherit from a single, common base with all possible
functionality: pw::stream::Stream
. This structure has
some similarities with Python’s io module and C#’s Stream class.
An alternative approach is to have the reading, writing, and seeking portions of the interface provided by different entities. This is how Go’s io package and C++’s input/output library are structured.
We chose to use a single base class for a few reasons:
The inheritance hierarchy is simple and linear. Despite the linear hierarchy, combining capabilities is natural with classes like
ReaderWriter
.In C++, separate interfaces for each capability requires either a complex virtual inheritance hierarchy or entirely separate hierarchies for each capability. Separate hierarchies can become cumbersome when trying to combine multiple capabilities. A
SeekableReaderWriter
would have to implement three different interfaces, which means three different vtables and three vtable pointers in each instance.Stream capabilities are clearly expressed in the type system, while naturally supporting optional functionality. A
Reader
may or may not supportStream::Seek()
. Applications that can handle seek failures gracefully way use seek on anyReader
. If seeking is strictly necessary, an API can accept aSeekableReader
instead.Expressing optional functionality in the type system is cumbersome when there are distinct interfaces for each capability.
Reader
,Writer
, andSeeker
interfaces would not be sufficient. To match the flexibility of the current structure, there would have to be separate optional versions of each interface, and classes for various combinations.Stream
would be an “OptionalReaderOptionalWriterOptionalSeeker” in this model.Code reuse is maximized. For example, a single
Stream::ConservativeLimit()
implementation supports many stream implementations.
Virtual interfaces#
pw_stream
uses virtual functions. Virtual functions enable runtime
polymorphism. The same code can be used with any stream implementation.
Virtual functions have inherently has more overhead than a regular function
call. However, this is true of any polymorphic API. Using a C-style struct
of function pointers makes different trade-offs but still has more overhead than
a regular function call.
For many use cases, the overhead of virtual calls insignificant. However, in some extremely performance-sensitive contexts, the flexibility of the virtual interface may not justify the performance cost.
Asynchronous APIs#
At present, pw_stream
is synchronous. All Stream
API calls are
expected to block until the operation is complete. This might be undesirable
for slow operations, like writing to NOR flash.
Pigweed has not yet established a pattern for asynchronous C++ APIs. The
Stream
class may be extended in the future to add asynchronous
capabilities, or a separate AsyncStream
could be created.
Dependencies#
Zephyr#
To enable pw_stream
for Zephyr add CONFIG_PIGWEED_STREAM=y
to the
project’s configuration.
Rust#
Pigweed centric analogs to Rust std
’s Read
, Write
, Seek
traits
as well as a basic Cursor
implementation are provided by the
pw_stream crate.
Python#
There are legacy Python utilities used for reading and writing a serial device for RPC purposes.