pw_stream

pw_stream provides a foundational interface for streaming data from one part of a system to another. In the simplest use cases, this is basically a memcpy behind a reusable interface that can be passed around the system. On the other hand, the flexibility of this interface means a pw_stream could terminate is something more complex, like a UART stream or flash memory.

Overview

At the most basic level, pw_stream’s interfaces provide very simple handles to enabling streaming data from one location in a system to an endpoint.

Example:

Status DumpSensorData(pw::stream::Writer& writer) {
  static char temp[64];
  ImuSample imu_sample;
  imu.GetSample(&info);
  size_t bytes_written = imu_sample.AsCsv(temp, sizeof(temp));
  return writer.Write(temp, bytes_written);
}

In this example, DumpSensorData() only cares that it has access to a Writer that it can use to stream data to using Writer::Write(). The Writer itself can be backed by anything that can act as a data “sink.”

pw::stream Interfaces

There are three basic capabilities of a stream:

  • Reading – Bytes can be read from the stream.

  • Writing – Bytes can be written to the stream.

  • Seeking – The position in the stream can be changed.

pw_stream provides a family of stream classes with different capabilities. The most basic class, Stream guarantees no functionality, while the most capable class, SeekableReaderWriter supports reading, writing, and seeking.

Usage overview

Interface documentation

Summary documentation for the pw_stream interfaces is below. See the API comments in pw_stream/public/pw_stream/stream.h for full details.

class Stream

A generic stream that may support reading, writing, and seeking, but makes no guarantees about whether any operations are supported. Stream serves as the base for the Reader, Writer, and ReaderWriter interfaces.

Stream cannot be extended directly. Instead, work with one of the derived classes that explicitly supports the required functionality. Stream should almost never be used in APIs; accept a derived class with the required capabilities instead.

All Stream methods are blocking. They return when the requested operation completes.

Public methods

bool readable() const

True if Read() is supported.

bool writable() const

True if Write() is supported.

bool seekable() const

True if Seek() is supported.

Result<ByteSpan> Read(ByteSpan buffer)
Result<ByteSpan> Read(void *buffer, size_t size_bytes)

Reads data from the stream into the provided buffer, if supported. As many bytes as are available up to the buffer size are copied into the buffer. Remaining bytes may by read in subsequent calls.

Returns:

  • OK - Between 1 and dest.size_bytes() were successfully read. Returns the span of read bytes.

  • UNIMPLEMENTED - This stream does not support writing.

  • FAILED_PRECONDITION - The Reader is not in state to read data.

  • RESOURCE_EXHAUSTED - Unable to read any bytes at this time. No bytes read. Try again once bytes become available.

  • OUT_OF_RANGE - Reader has been exhausted, similar to EOF. No bytes were read, no more will be read.

Status Write(ConstByteSpan data)

Writes the provided data to the stream, if supported.

Returns:

  • OK - Data was successfully accepted by the stream.

  • UNIMPLEMENTED - This stream does not support writing.

  • FAILED_PRECONDITION - The writer is not in a state to accept data.

  • RESOURCE_EXHAUSTED - The writer was unable to write all of requested data at this time. No data was written.

  • OUT_OF_RANGE - The Writer has been exhausted, similar to EOF. No data was written; no more will be written.

Status Seek(ssize_t offset, Whence origin = kBeginning)

Changes the current read & write position in the stream, if supported.

Returns:

  • OK - Successfully updated the position.

  • UNIMPLEMENTED - Seeking is not supported for this stream.

  • OUT_OF_RANGE - Attempted to seek beyond the bounds of the stream. The position is unchanged.

size_t Tell() const

Returns the current read & write position in the stream, if supported. Returns Stream::kUnknownPosition (size_t(-1)) if unsupported.

size_t ConservativeReadLimit() const

Likely minimum bytes available to read. Returns kUnlimited (size_t(-1)) if there is no limit or it is unknown.

size_t ConservativeWriteLimit() const

Likely minimum bytes available to write. Returns kUnlimited (size_t(-1)) if there is no limit or it is unknown.

Private virtual methods

Stream’s public methods are non-virtual. The public methods call private virtual methods that are implemented by derived classes.

private virtual StatusWithSize DoRead(ByteSpan destination)

Virtual Read() function implemented by derived classes.

private virtual Status DoWrite(ConstByteSpan data)

Virtual Write() function implemented by derived classes.

private virtual Status DoSeek(ssize_t offset, Whence origin)

Virtual Seek() function implemented by derived classes.

private virtual size_t DoTell() const

Virtual Tell() function optionally implemented by derived classes. The default implementation always returns kUnknownPosition.

private virtual size_t ConservativeLimit(LimitType limit_type)

Virtual function optionally implemented by derived classes that is used for ConservativeReadLimit() and ConservativeWriteLimit(). The default implementation returns kUnlimited or 0 depending on whether the stream is readable/writable.

Reader interfaces

class Reader : public Stream

A Stream that supports writing but not reading. The Write() method is hidden.

Use in APIs when:
  • Must read from, but not write to, a stream.

  • May or may not need seeking. Use a SeekableReader& if seeking is required.

Inherit from when:
  • Reader cannot be extended directly. Instead, extend SeekableReader, NonSeekableReader, or (rarely) RelativeSeekableReader, as appropriate.

A Reader may or may not support seeking. Check seekable() or try calling Seek() to determine if the stream is seekable.

class SeekableReader : public RelativeSeekableReader

A Reader that fully supports seeking.

Use in APIs when:
  • Absolute seeking is required. Use Reader& if seeking is not required or seek failures can be handled gracefully.

Inherit from when:
  • Implementing a reader that supports absolute seeking.

class RelativeSeekableReader : public Reader

A Reader that at least partially supports seeking. Seeking within some range of the current position works, but seeking beyond that or from other origins may or may not be supported. The extent to which seeking is possible is NOT exposed by this API.

Use in APIs when:
  • Relative seeking is required. Usage in APIs should be rare; generally Reader should be used instead.

Inherit from when:
  • Implementing a Reader that can only support seeking near the current position.

A buffered Reader that only supports seeking within its buffer is a good example of a RelativeSeekableReader.

class NonSeekableReader : public Reader

A Reader that does not support seeking. The Seek() method is hidden.

Use in APIs when:
  • Do NOT use in APIs! If seeking is not required, use Reader& instead.

Inherit from when:
  • Implementing a Reader that does not support seeking.

Writer interfaces

class Writer : public Stream

A Stream that supports writing but not reading. The Read() method is hidden.

Use in APIs when:
  • Must write to, but not read from, a stream.

  • May or may not need seeking. Use a SeekableWriter& if seeking is required.

Inherit from when:
  • Writer cannot be extended directly. Instead, extend SeekableWriter, NonSeekableWriter, or (rarely) RelativeSeekableWriter, as appropriate.

A Writer may or may not support seeking. Check seekable() or try calling Seek() to determine if the stream is seekable.

class SeekableWriter : public RelativeSeekableWriter

A Writer that fully supports seeking.

Use in APIs when:
  • Absolute seeking is required. Use Writer& if seeking is not required or seek failures can be handled gracefully.

Inherit from when:
  • Implementing a writer that supports absolute seeking.

class RelativeSeekableWriter : public Writer

A Writer that at least partially supports seeking. Seeking within some range of the current position works, but seeking beyond that or from other origins may or may not be supported. The extent to which seeking is possible is NOT exposed by this API.

Use in APIs when:
  • Relative seeking is required. Usage in APIs should be rare; generally Writer should be used instead.

Inherit from when:
  • Implementing a Writer that can only support seeking near the current position.

A buffered Writer that only supports seeking within its buffer is a good example of a RelativeSeekableWriter.

class NonSeekableWriter : public Writer

A Writer that does not support seeking. The Seek() method is hidden.

Use in APIs when:
  • Do NOT use in APIs! If seeking is not required, use Writer& instead.

Inherit from when:
  • Implementing a Writer that does not support seeking.

ReaderWriter interfaces

class ReaderWriter : public Stream

A Stream that supports both reading and writing.

Use in APIs when:
  • Must both read from and write to a stream.

  • May or may not need seeking. Use a SeekableReaderWriter& if seeking is required.

Inherit from when:
  • Cannot extend ReaderWriter directly. Instead, extend SeekableReaderWriter, NonSeekableReaderWriter, or (rarely) RelativeSeekableReaderWriter, as appropriate.

A ReaderWriter may or may not support seeking. Check seekable() or try calling Seek() to determine if the stream is seekable.

class SeekableReaderWriter : public RelativeSeekableReaderWriter

A ReaderWriter that fully supports seeking.

Use in APIs when:
  • Absolute seeking is required. Use ReaderWriter& if seeking is not required or seek failures can be handled gracefully.

Inherit from when:
  • Implementing a writer that supports absolute seeking.

class RelativeSeekableReaderWriter : public ReaderWriter

A ReaderWriter that at least partially supports seeking. Seeking within some range of the current position works, but seeking beyond that or from other origins may or may not be supported. The extent to which seeking is possible is NOT exposed by this API.

Use in APIs when:
  • Relative seeking is required. Usage in APIs should be rare; generally ReaderWriter should be used instead.

Inherit from when:
  • Implementing a ReaderWriter that can only support seeking near the current position.

A buffered ReaderWriter that only supports seeking within its buffer is a good example of a RelativeSeekableReaderWriter.

class NonSeekableReaderWriter : public ReaderWriter

A ReaderWriter that does not support seeking. The Seek() method is hidden.

Use in APIs when:
  • Do NOT use in APIs! If seeking is not required, use ReaderWriter& instead.

Inherit from when:
  • Implementing a ReaderWriter that does not support seeking.

Implementations

pw_stream includes a few stream implementations for general use.

class MemoryWriter : public SeekableWriter

The MemoryWriter class implements the Writer interface by backing the data destination with an externally-provided memory buffer. MemoryWriterBuffer extends MemoryWriter to internally provide a memory buffer.

class MemoryReader : public SeekableReader

The MemoryReader class implements the Reader interface by backing the data source with an externally-provided memory buffer.

class NullReaderWriter : public SeekableReaderWriter

NullReaderWriter is a no-op stream implementation, similar to /dev/null. Writes are always dropped. Reads always return OUT_OF_RANGE. Seeks have no effect.

class StdFileWriter : public SeekableWriter

StdFileWriter wraps an std::ofstream with the Writer interface.

class StdFileReader : public SeekableReader

StdFileReader wraps an std::ifstream with the Reader interface.

Why use pw_stream?

Standard API

pw_stream provides a standard way for classes to express that they have the ability to write data. Writing to one sink versus another sink is a matter of just passing a reference to the appropriate Writer.

As an example, imagine dumping sensor data. If written against a random HAL or one-off class, there’s porting work required to write to a different sink (imagine writing over UART vs dumping to flash memory). Building a “dumping” implementation against the Writer interface prevents a dependency on a bespoke API that would require porting work.

Similarly, after building a Writer implementation for a Sink that data could be dumped to, that same Writer can be reused for other contexts that already write data to the pw::stream::Writer interface.

Before:

// Not reusable, depends on `Uart`.
void DumpSensorData(Uart& uart) {
  static char temp[64];
  ImuSample imu_sample;
  imu.GetSample(&info);
  size_t bytes_written = imu_sample.AsCsv(temp, sizeof(temp));
  uart.Transmit(temp, bytes_written, /*timeout_ms=*/ 200);
}

After:

// Reusable; no more Uart dependency!
Status DumpSensorData(Writer& writer) {
  static char temp[64];
  ImuSample imu_sample;
  imu.GetSample(&info);
  size_t bytes_written = imu_sample.AsCsv(temp, sizeof(temp));
  return writer.Write(temp, bytes_written);
}

Reduce intermediate buffers

Often functions that write larger blobs of data request a buffer is passed as the destination that data should be written to. This requires a buffer to be allocated, even if the data only exists in that buffer for a very short period of time before it’s written somewhere else.

In situations where data read from somewhere will immediately be written somewhere else, a Writer interface can cut out the middleman buffer.

Before:

// Requires an intermediate buffer to write the data as CSV.
void DumpSensorData(Uart& uart) {
  char temp[64];
  ImuSample imu_sample;
  imu.GetSample(&info);
  size_t bytes_written = imu_sample.AsCsv(temp, sizeof(temp));
  uart.Transmit(temp, bytes_written, /*timeout_ms=*/ 200);
}

After:

// Both DumpSensorData() and RawSample::AsCsv() use a Writer, eliminating the
// need for an intermediate buffer.
Status DumpSensorData(Writer& writer) {
  RawSample imu_sample;
  imu.GetSample(&info);
  return imu_sample.AsCsv(writer);
}

Prevent buffer overflow

When copying data from one buffer to another, there must be checks to ensure the copy does not overflow the destination buffer. As this sort of logic is duplicated throughout a codebase, there’s more opportunities for bound-checking bugs to sneak in. Writers manage this logic internally rather than pushing the bounds checking to the code that is moving or writing the data.

Similarly, since only the Writer has access to any underlying buffers, it’s harder for functions that share a Writer to accidentally clobber data written by others using the same buffer.

Before:

Status BuildPacket(Id dest, span<const std::byte> payload,
                   span<std::byte> dest) {
  Header header;
  if (dest.size_bytes() + payload.size_bytes() < sizeof(Header)) {
    return Status::ResourceExhausted();
  }
  header.dest = dest;
  header.src = DeviceId();
  header.payload_size = payload.size_bytes();

  memcpy(dest.data(), &header, sizeof(header));
  // Forgetting this line would clobber buffer contents. Also, using
  // a temporary span instead could leave `dest` to be misused elsewhere in
  // the function.
  dest = dest.subspan(sizeof(header));
  memcpy(dest.data(), payload.data(), payload.size_bytes());
}

After:

Status BuildPacket(Id dest, span<const std::byte> payload, Writer& writer) {
  Header header;
  header.dest = dest;
  header.src = DeviceId();
  header.payload_size = payload.size_bytes();

  writer.Write(header);
  return writer.Write(payload);
}

Design notes

Sync & Flush

The pw::stream::Stream API does not include Sync() or Flush() functions. There no mechanism in the Stream API to synchronize a Reader’s potentially buffered input with its underlying data source. This must be handled by the implementation if required. Similarly, the Writer implementation is responsible for flushing any buffered data to the sink.

Flush() and Sync() were excluded from Stream for a few reasons:

  • The semantics of when to call Flush()/Sync() on the stream are unclear. The presence of these methods complicates using a Reader or Writer.

  • Adding one or two additional virtual calls increases the size of all Stream vtables.

Class hierarchy

All pw_stream classes inherit from a single, common base with all possible functionality: pw::stream::Stream. This structure has some similarities with Python’s io module and C#’s Stream class.

An alternative approach is to have the reading, writing, and seeking portions of the interface provided by different entities. This is how Go’s io and C++’s input/output library are structured.

We chose to use a single base class for a few reasons:

  • The inheritance hierarchy is simple and linear. Despite the linear hierarchy, combining capabilities is natural with classes like ReaderWriter.

    In C++, separate interfaces for each capability requires either a complex virtual inheritance hierarchy or entirely separate hierarchies for each capability. Separate hierarchies can become cumbersome when trying to combine multiple capabilities. A SeekableReaderWriter would have to implement three different interfaces, which means three different vtables and three vtable pointers in each instance.

  • Stream capabilities are clearly expressed in the type system, while naturally supporting optional functionality. A Reader may or may not support Stream::Seek(). Applications that can handle seek failures gracefully way use seek on any Reader. If seeking is strictly necessary, an API can accept a SeekableReader instead.

    Expressing optional functionality in the type system is cumbersome when there are distinct interfaces for each capability. Reader, Writer, and Seeker interfaces would not be sufficient. To match the flexibility of the current structure, there would have to be separate optional versions of each interface, and classes for various combinations. Stream would be an “OptionalReaderOptionalWriterOptionalSeeker” in this model.

  • Code reuse is maximized. For example, a single Stream::ConservativeLimit() implementation supports many stream implementations.

Virtual interfaces

pw_stream uses virtual functions. Virtual functions enable runtime polymorphism. The same code can be used with any stream implementation.

Virtual functions have inherently has more overhead than a regular function call. However, this is true of any polymorphic API. Using a C-style struct of function pointers makes different trade-offs but still has more overhead than a regular function call.

For many use cases, the overhead of virtual calls insignificant. However, in some extremely performance-sensitive contexts, the flexibility of the virtual interface may not justify the performance cost.

Asynchronous APIs

At present, pw_stream is synchronous. All Stream API calls are expected to block until the operation is complete. This might be undesirable for slow operations, like writing to NOR flash.

Pigweed has not yet established a pattern for asynchronous C++ APIs. The Stream class may be extended in the future to add asynchronous capabilities, or a separate AsyncStream could be created.