0114: Channels#

SEED-0114: Channels

Status: Open for Comments Intent Approved Last Call Accepted Rejected

Proposal Date: 2023-10-10

CL: pwrev/175471

Author: Wyatt Hepler

Facilitator: Carlos Chinchilla

Summary#

This document proposes a new pw_channel module and pw::channel::Channel class. The module is similar to pw_stream, with three key changes:

pw_channel will provide the data transmit and receive API for the upcoming socket abstraction.

Proposal#

This SEED proposes the following:

  • Introduce a new pw_channel module.

  • Introduce a pw::channel::Channel virtual interface. Implementors of this interface may expose byte stream or datagram semantics. All operations in this interface are async and use 0109: Communication Buffers buffers to provide zero-copy operations.

  • Use pw_channel as the basis for the upcoming Pigweed sockets API.

  • Replace pw_stream with pw_channel to the extent possible.

Motivation#

One of the fundamental operations of computing is sending data from one place to another. Examples include exchanging data with

  • an in-memory data structure,

  • a file in a filesystem,

  • a hardware peripheral,

  • another process, or

  • a device across a network.

There are many interfaces for data exchange. Pigweed provides pw_stream, which is a simple synchronous API for transmitting and receiving data. pw_stream’s simple model has made it prevalent in Pigweed.

The Pigweed team is revamping its communications systems (see 0107: Pigweed Communications). The new sockets API will be a critical piece of that story. The core job of a socket is to exchange data with another node in the network. pw_stream’s purpose is to facilitate data exchange, but it is too limited for sockets. pw_stream is missing support for several features, including:

  • Datagrams – Data is a byte stream. Datagrams are not supported.

  • Unreliability – Sockets may not guarantee delivery of data. pw_stream assumes no data is lost if Write returns OK.

  • Asynchronous operations – All functions block until the operation completes.

  • Zero copy operations – All reads and writes require data to be copied.

  • Vectored I/O – All reads and writes use a single contiguous buffer.

  • Backpressure – There is no mechanism for a stream to notify the producer that it needs more time to process data.

These features are fairly complex and may be exposed in a variety of ways in an API. This SEED proposes a new pw_stream-like Channel data exchange API. Channel provides a standard I/O interface with these advanced features. Like pw_stream, this API will be used anywhere data needs to be read and/or written.

Use cases#

pw_rpc#

pw_rpc is a communications protocol that enables calling procedures on different nodes (i.e. RPCs), and sharing data between them. RPCs can be sent using pw_stream APIs, which are blocking.

Sockets#

Sockets are a communications channel between two endpoints in a network. Sockets support exchanging data:

  • as datagrams or a stream or bytes, and

  • reliably or unreliably.

pw_stream#

Channel should support all use cases addressed by pw_stream. These include:

  • pw::stream::NullStreamNullStream ignores all bytes written to it and produces no bytes when read. This is used when no input or output is needed.

  • pw::stream::CountingNullStream – Counts bytes written to it. Used to to determine the size of an encoded object before it is encoded to its final destination.

  • pw::stream::MemoryReader / pw::stream::MemoryWriter – Writes data to or reads data from a fixed, contiguous memory buffer. Example uses include encoding a protobuf for transport.

  • pw::stream::SocketStream – Supports reading from and writing to a TCP socket.

  • pw::blob_store::BlobStore::Reader / pw::blob_store::BlobStore::Writerpw_blob_store uses a stream interface for reading and writing. This is similar to a file object.

Hardware interfaces#

It is often necessary to exchange data with hardware I/O blocks. The Channel API could be used to abstract communications with I/O interfaces.

Existing solutions#

pw_stream#

pw_stream provides for a synchronous, reliable byte-oriented stream.

See pw_stream.

C++#

C++ provides an I/O stream family of classes.

Java#

Java provides a hierarchy of channel classes with a variety of flavors. The Channel interface provides just two methods: isOpen() and close(). Various I/O operations are mixed in through different interfaces. Channel supports byte stream, datagram, asynchronous, and scatter/ gather IO.

C##

The C# programming language offers a stream class similar to pw_stream and the proposed pw_channel module. It supports synchronous and asynchronous operations on a stream of bytes. https://learn.microsoft.com/en-us/dotnet/api/system.io.stream?view=net-7.0

C#’s Channel API has a different intent than pw_channel. Its purpose is to synchronize objects between endpoints, and is somewhat different from what is proposed here. https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=net-7.0

Requirements#

  • Support data transmission for the upcoming sockets API (0107: Pigweed Communications):

    • reliable byte stream (SOCK_STREAM)

    • unreliable datagram (SOCK_DGRAM)

    • reliable datagram (SOCK_SEQPACKET)

  • Asynchronous operations.

  • Efficient, minimally copying buffer with MultiBuf (0109: Communication Buffers).

Design#

Conceptually, a channel is a sequence of bytes or datagrams exchanged between two endpoints. An endpoint can be anything that produces or consumes data, such as an in-memory data structure, a file in a filesystem, a hardware peripheral, or a network socket. Both endpoints may be Channel implementations, or the Channel may simply forward to something that provides compatible semantics, e.g. a memory buffer or OS socket.

In Unix, “everything is a file”. File descriptors provide a common I/O interface used for everything from files to pipes to sockets to hardware devices. Channels fill a similar role as POSIX file descriptors.

Channel semantics#

pw_channel will provide the data exchange API for Pigweed’s upcoming network sockets. To this end, Channel supports the following socket semantics:

  • reliable byte stream (SOCK_STREAM)

  • unreliable datagram (SOCK_DGRAM)

  • reliable datagram (SOCK_SEQPACKET)

Reliability and data type (stream versus datagram) are essential aspects of channel semantics. These properties affect how code that uses the APIs is written. A channel with different semantics cannot be swapped for another without updating the assumptions in the surrounding code.

Data type: datagrams & byte streams#

Fundamentally, a channel involves sending data from one endpoint to another. The endpoints might both be Channel instances (e.g. two sockets). Or, one endpoint could be a Channel while the other is an in-memory data structure, file in a file system, or hardware peripheral.

The data type dictates the basic unit of data transmission. Datagram channels send and receive datagrams: “self-contained, independent entit[ies] of data” (RFC 1594). Datagrams contain a payload of zero or more bytes. pw_channel does not define a maximum payload size for datagrams.

Byte stream channels send and receive an arbitrary sequence of bytes. Zero-length byte stream writes are no-ops and may not result in any bytes being transmitted.

In terms of the channels API, Read, Write, and Seek functions have different meanings for byte and and datagram channels. For byte stream channels, these functions work with an arbitrary number of bytes. For datagram channels, Read, Write, and Seek are in terms of datagrams.

Reliable channels#

Reliable channels guarantee that their data is received in order and without loss. The API user does not have to do anything to ensure this. After a write is accepted, the user will never have to retry it. Reads always provide data in order without loss. The channel implementation is responsible for this.

For some channels, reliability is trivial; for others it requires significant work:

  • A memory channel that writes to a buffer is trivially reliable.

  • A socket communicating across a network will require a complex protocol such as TCP to guarantee that the data is delivered.

Initially, only reliable byte-oriented channels will be supported. Unreliable byte streams are not commonly supported, and would be difficult to apply in many use cases. There are circumstances where unreliable byte streams do makes sense, such as reading time-sensitive sensor data, where the consumer only wants the very latest data regardless of drops. Unreliable byte streams may be added in the future.

Data loss#

Data is never silently lost in a reliable channel. Unrecoverable data loss always results in the eventual closure of the channel, since a fundamental invariant of the channel cannot be maintained.

A few examples:

  • A write to a TCP channel fails because of a transient hardware issue. The channel and underlying TCP connection are closed.

  • A TCP channel times out on a retry. The channel and underlying TCP connection are closed.

  • A write to a channel that fills a ring buffer is requested. A MultiBuf for the write is not provided immediately because the ring buffer is full. The channel stays open, but the write is delayed until the ring buffer has sufficient space.

Reliability & connections#

Reliable channels operate as if they have a connection, even if the underlying implementation does not establish a connection. This specifically means that:

  • It is assumed that the peer endpoint will receive data for which the write call succeeded.

  • If data is lost, the error will be reported in some form and the channel will be closed.

For example, a TCP socket channel would maintain an explicit connection, while a ring buffer channel would not.

Unreliable channels#

Unreliable datagram channels make no guarantees about whether datagrams are delivered and in what order they arrive. Users are responsible for tracking drops and ordering if required.

Unreliable channels should report read and write failures whenever possible, but an OK write does not indicate that the data is received by the other endpoint.

Flow control, backpressure, and ConservativeLimit#

A channel may provide backpressure through its async write API. The PollWritable method should be used to ensure that the channel is ready to receive calls to Write. Additionally, the MultiBufAllocator may wait to provide a MultiBuf for writing until memory becomes available.

pw_stream offered a notion of flow control through the pw::stream::Stream::ConservativeWriteLimit() function. Code using a stream could check the write limit prior to writing data to determine if the stream is ready to receive more. This function will not be provided in pw_channel.

Openness / closedness#

pw_channel will have an explicit open/closed concept that pw_stream lacks. Reads and writes may succeed when the channel is open. Reads and writes never succeed when the channel is closed.

The channel API supports closing a channel, but does not support opening a channel. Channels are opened by interacting with a concrete class.

Reliable channels are closed if unrecoverable data loss occurs. Unreliable channels may be closed when reads or writes are known to fail (e.g. a cable was unplugged), but this is not required.

Synchronous APIs#

The pw_channel class may provide synchronous versions of its functions, implementated in terms of the asynchronous API. These will poll the asynchronous API until it completes, blocking on a binary semaphore or similar primitive if supported. This will leverage a pw_async helper for this purpose.

Channel Class Capabilities#

Channel s may offer any of five capabilities:

Capability

Description

kReliable

Data is guaranteed to arrive in order, without loss.

kSeekable

The read/write position may be changed via the Seek method.

kDatagram

Data is guaranteed to be received in whole packets matching the size and contents of a single Write call.

kReadable

Supports reading data.

kWritable

Supports writing data

These capabilities are expressed as generic arguments to the Channel class, e.g. Channel<kReadable | kReliable> for a Channel that is readable and reliable. Aliases are provided for common combinations, such as ByteStream for a reliable non-seekable non-datagram stream of bytes (such as a TCP stream). Certain nonsensical combinations, such as a channel that is kSeekable but not kReadable or kWritable are disallowed via static_assert.

Conversion#

Channels may be freely converted to channels with fewer capabilities, e.g. Channel<kReadable | kWritable> may be used as a Channel<kReadable>. This allows Channels with compatible semantics to be substituted for one another safely.

Shared Base Class for Minimal Code Size#

Channel also inherits from an AnyChannel base class which provides the underlying virtual interface. Sharing a single base class avoids multiple inheritance, minimizing vtable overhead.

Prototype Demonstrating Channel Capabilities#

A prototype demonstrating this interface can be seen here.

API sketch#

An outline of the AnyChannel base class follows. AnyChannel will rarely be used directly, since it makes no guarantees about any channel capabilities or the data type. The function signatures and comments apply to all derived classes, however.

namespace pw::channel {

/// A generic data channel that may support reading or writing bytes.
///
/// Note that this channel should be used from only one ``pw::async::Task``
/// at a time, as the ``Poll`` methods are only required to remember the
/// latest ``pw::async::Context`` that was provided.
class AnyChannel {
 public:
  // Properties
  [[nodiscard]] bool reliable() const;
  [[nodiscard]] DataType data_type() const;
  [[nodiscard]] bool readable() const;
  [[nodiscard]] bool writable() const;
  [[nodiscard]] Seekability seekable() const;

  [[nodiscard]] bool is_open() const;

  // Write API

  // Checks whether a writeable channel is *currently* writeable.
  //
  // This should be called before attempting to ``Write``, and may be called
  // before allocating a write buffer if trying to reduce memory pressure.
  //
  // If ``Ready`` is returned, a *single* caller may proceed to ``Write``.
  //
  // If ``Pending`` is returned, ``cx`` will be awoken when the channel
  // becomes writeable again.
  //
  // Note: this method will always return ``Ready`` for non-writeable
  // channels.
  MaybeReady<> PollWritable(pw::async::Context& cx);

  // Gives access to an allocator for write buffers. The MultiBufAllocator
  // provides an asynchronous API for obtaining a buffer.
  //
  // This allocator must *only* be used to allocate the next argument to
  // ``Write``. The allocator must be used at most once per call to
  // ``Write``, and the returned ``MultiBuf`` must not be combined with
  // any other ``MultiBuf`` s or ``Chunk`` s.
  //
  // Write allocation attempts will always return ``std::nullopt`` for
  // channels that do not support writing.
  MultiBufAllocator& GetWriteAllocator();

  // Writes using a previously allocated MultiBuf. Returns a token that
  // refers to this write. These tokens are monotonically increasing, and
  // FlushPoll() returns the value of the latest token it has flushed.
  //
  // The ``MultiBuf`` argument to ``Write`` may consist of either:
  //   (1) A single ``MultiBuf`` allocated by ``GetWriteAllocator()``
  //       that has not been combined with any other ``MultiBuf`` s
  //       or ``Chunk``s OR
  //   (2) A ``MultiBuf`` containing any combination of buffers from sources
  //       other than ``GetWriteAllocator``.
  //
  // This requirement allows for more efficient use of memory in case (1).
  // For example, a ring-buffer implementation of a ``Channel`` may
  // specialize ``GetWriteAllocator`` to return the next section of the
  // buffer available for writing.
  //
  // May fail with the following error codes:
  //
  // * OK - Data was accepted by the channel
  // * UNIMPLEMENTED - The channel does not support writing.
  // * UNAVAILABLE - The write failed due to a transient error (only applies
  //   to unreliable channels).
  // * FAILED_PRECONDITION - The channel is closed.
  Result<WriteToken> Write(MultiBuf&&);

  // Flushes pending writes.
  //
  // Returns a ``MaybeReady`` indicating whether or not flushing has
  // completed.
  //
  // After this call, ``LastFlushed`` may be used to discover which
  // ``Write`` calls have successfully finished flushing.
  //
  // * Ready(OK) - All data has been successfully flushed.
  // * Ready(UNIMPLEMENTED) - The channel does not support writing.
  // * Ready(FAILED_PRECONDITION) - The channel is closed.
  // * Pending - Data remains to be flushed.
  [[nodiscard]] MaybeReady<pw::Status> PollFlush(async::Context& cx);

  // Returns the latest ```WriteToken``` that was successfully flushed.
  //
  // Note that a ``Write`` being flushed does not necessarily mean that the
  // data was received by the remote. For unreliable channels, flushing may
  // simply mean that data was written out, not that it was received.
  [[nodiscard]] WriteToken LastFlushed() const;

  // Read API

  // Returns a MultiBuf read data, if available. If data is not available,
  // invokes cx.waker() when it becomes available.
  //
  // For datagram channels, each successful read yields one complete
  // datagram. For byte stream channels, each successful read yields some
  // number of bytes.
  //
  // Channels only support one read operation / waker at a time.
  //
  // * OK - Data was read into a MultiBuf.
  // * UNIMPLEMENTED - The channel does not support reading.
  // * FAILED_PRECONDITION - The channel is closed.
  // * OUT_OF_RANGE - The end of the stream was reached. This may be though
  //   of as reaching the end of a file. Future reads may succeed after
  //   ``Seek`` ing backwards, but no more new data will be produced. The
  //   channel is still open; writes and seeks may succeed.
  MaybeReady<Result<MultiBuf>> PollRead(async::Context& cx);

  // On byte stream channels, reads up to max_bytes from the channel.
  // This function is hidden on datagram-oriented channels.
  MaybeReady<Result<MultiBuf>> PollRead(async::Context& cx, size_t max_bytes);

  // Changes the position in the stream.
  //
  // Any ``PollRead`` or ``Write`` calls following a call to ``Seek`` will be
  // relative to the new position. Already-written data still being flushed
  // will be output relative to the old position.
  //
  // * OK - The current position was successfully changed.
  // * UNIMPLEMENTED - The channel does not support seeking.
  // * FAILED_PRECONDITION - The channel is closed.
  // * NOT_FOUND - The seek was to a valid position, but the channel is no
  //   longer capable of seeking to this position (partially seekable
  //   channels only).
  // * OUT_OF_RANGE - The seek went beyond the end of the stream.
  Status Seek(ptrdiff_t position, Whence whence);

  // Returns the current position in the stream, or kUnknownPosition if
  // unsupported.
  size_t Position() const;

  // Closes the channel, flushing any data.
  //
  // * OK - The channel was closed and all data was sent successfully.
  // * DATA_LOSS - The channel was closed, but not all previously written
  //   data was delivered.
  // * FAILED_PRECONDITION - Channel was already closed, which can happen
  //   out-of-band due to errors.
  MaybeReady<pw::Status> PollClose(async::Context& cx);

 private:
  virtual bool do_reliable() const;
  virtual DataType do_data_type() const;
  virtual bool do_readable() const;
  virtual bool do_writable() const;
  virtual Seekability do_seekable() const;
  virtual bool do_is_open() const;

  // Virtual interface.
  virtual MultiBufAllocator& DoGetWriteBufferAllocator() = 0;

  virtual MaybeReady<> PollWritable(async::Context& cx) = 0;

  virtual Result<WriteToken> DoWrite(MultiBuf&& buffer) = 0;

  virtual WriteToken DoPollFlush(async::Context& cx) = 0;

  [[nodiscard]] WriteToken LastFlushed() const = 0;

  // The max_bytes argument is ignored for datagram-oriented channels.
  virtual MaybeReady<Result<MultiBuf>> DoReadPoll(
      async::Context& cx, size_t max_bytes) = 0;

  virtual DoSeek(ptrdiff_t position, Whence whence) = 0;

  virtual size_t DoPosition() const { return kUnknownPosition; }

  virtual async::MaybeReady<Status> DoClosePoll(async::Context& cx);
};
}  // namespace pw::channel

pw_channel and pw_stream#

As described, pw_channel is closely based on pw_stream. It adds async, MultiBuf, and new socket-inspired semantics.

pw_channel is intended to supersede pw_stream. There are a few options for how to reconcile the two modules. From most to least ideal, these are:

  • Fully replace pw_stream with pw_channel and remove the pw_stream module.

  • Rework pw_stream so it inherits from pw::channel::Channel.

  • Keep pw_stream, but provide adapters to convert between pw_stream and pw_channel.

Fully replacing pw_stream with pw_channel could be complicated due to:

  • Potential code size increase because of MultiBuf and the async poll model.

  • The scale of migrating the all Pigweed users off of pw_stream.

  • Increased API complexity imposing a burden on Pigweed users.