Usage examples#

pw_multibuf: A buffer API optimized for zero-copy messaging

MultiBufs are flexible binary data structures. As such, there is no single guide on how to use them. Instead, several examples are presented that demonstrate various aspects of how they can be used.

In each of the guides below, a pw::Allocator instance is needed to instantiate a MultiBuf instance. This allocator is used to allocate the memory for the MultiBuf instance’s deque of Entries. For the purposes of these examples, the simple, low-performance AllocatorForTest is used.

1  allocator::test::AllocatorForTest<512> allocator;
2
3  // DOCSTAG: [pw_multibuf-examples-basic]
4  MultiBuf::Instance mbuf(allocator);

See pw_allocator for more details on allocator selection.

Iterating over heterogeneous memory#

A MultiBuf instance can represent a sequence of non-contiguous regions of memory. These regions may also have different Memory ownership semantics. For example, a MultiBuf instance could be composed of some memory that it owns, some that is shared, and some that is unmanaged (i.e. static or stack-allocated).

The following example creates just such a MultiBuf instance:

 1/// Creates a MultiBuf that holds non-contiguous memory regions with different
 2/// memory ownership.
 3ConstMultiBuf::Instance CreateMultiBuf(Allocator& allocator) {
 4  ConstMultiBuf::Instance mbuf(allocator);
 5  random::XorShiftStarRng64 rng(1);
 6  size_t size;
 7
 8  // Add some owned data.
 9  rng.GetInt(size, kMaxSize);
10  auto owned_data = allocator.MakeUnique<std::byte[]>(size);
11  rng.Get({owned_data.get(), size});
12  mbuf->PushBack(std::move(owned_data));
13
14  // Add some static data.
15  rng.GetInt(size, kMaxSize);
16  ByteSpan static_data(static_buffer.data(), size);
17  rng.Get(static_data);
18  mbuf->PushBack(static_data);
19
20  // Add some shared data.
21  rng.GetInt(size, kMaxSize);
22  auto shared_data = allocator.MakeShared<std::byte[]>(size);
23  rng.Get({shared_data.get(), size});
24  mbuf->PushBack(shared_data);
25
26  return mbuf;
27}

Note the use of pw::ConstMultiBuf. This type alias of GenericMultiBuf includes the kConst as one of its Properties.

Regardless of how the underlying memory is stored, MultiBuf methods can iterate over the data, both by individual bytes or by contiguous Chunks.

Iterating by bytes treats the MultiBuf instance as a single, contiguous buffer. This is useful when the logic does not need to be aware of the underlying memory layout. Be careful to avoid assuming that the memory is contiguous; code such as std::memcpy(dst, &(*mbuf.begin()), mbuf.size()) is almost certainly wrong.

The following example calculates a CRC32 checksum by iterating through every byte in the MultiBuf instance:

1/// Calculates the CRC32 checksum of a MultiBuf one byte at a time.
2uint32_t BytesChecksum(const ConstMultiBuf& mbuf) {
3  checksum::Crc32 crc32;
4  for (std::byte b : mbuf) {
5    crc32.Update(b);
6  }
7  return crc32.value();
8}

Alternatively, it is possible to iterate over the individual contiguous memory chunks that make up the MultiBuf instance. This is useful for operations that can be optimized by working with larger, contiguous blocks of data at once, such as sending data over a network or writing to a file.

The following example calculates the same CRC32 checksum, but does so by operating on whole chunks (i.e. ConstByteSpan objects) at a time:

1/// Calculates the CRC32 checksum of a MultiBuf one chunk at a time.
2uint32_t ChunksChecksum(const ConstMultiBuf& mbuf) {
3  checksum::Crc32 crc32;
4  for (ConstByteSpan s : mbuf.ConstChunks()) {
5    crc32.Update(s);
6  }
7  return crc32.value();
8}

Both methods iterate over the exact same data, just with different levels of granularity. The choice of which to use depends on the specific requirements of the task.

For the complete example, see //pw_multibuf/examples/iterate.cc.

Variable-length entry queue#

To further demonstrate how memory regions can be added and removed from a MultiBuf instance consider the following example. This implements a variable-length queue of binary data entries, very similar to pw::InlineVarLenEntryQueue.

 1class MultiBufQueue {
 2 public:
 3  static Result<MultiBufQueue> Create(Allocator& allocator, size_t max_chunks) {
 4    MultiBufQueue queue(allocator);
 5    if (!queue.mbuf_->TryReserveChunks(max_chunks)) {
 6      return Status::ResourceExhausted();
 7    }
 8    return queue;
 9  }
10
11  [[nodiscard]] bool empty() const { return mbuf_->empty(); }
12
13  [[nodiscard]] bool full() const {
14    return mbuf_->ConstChunks().size() == mbuf_->ConstChunks().capacity();
15  }
16
17  void push_back(UniquePtr<std::byte[]>&& bytes) {
18    PW_ASSERT(!full());
19    mbuf_->PushBack(std::move(bytes));
20  }
21
22  UniquePtr<const std::byte[]> pop_front() {
23    return mbuf_->Release(mbuf_->cbegin());
24  }
25
26 private:
27  constexpr explicit MultiBufQueue(Allocator& allocator) : mbuf_(allocator) {}
28
29  ConstMultiBuf::Instance mbuf_;
30};

This queue does all of its dynamic allocation in the factory method. After that method succeeds, all the queue methods are infallible.

For the complete example, see //pw_multibuf/examples/queue.cc.

Asynchronous queue#

With a few small changes, the above example can be used to implement an efficient, asynchronous producer-consumer queue. Here, one or more tasks produce data and add it to a queue, while one or more other tasks consume data from the queue. A MultiBuf instance can manage the lifecycle of the queued data Chunks, and its observer mechanism can be used to signal between tasks.

To start, an AsyncMultiBufObserver instance is added to wake up tasks that are waiting for the queue to become non-empty (for consumers) or non-full (for producers):

 1class AsyncMultiBufQueueObserver : public MultiBufObserver {
 2 public:
 3  async2::Poll<> PendNotFull(async2::Context& context) {
 4    PW_ASYNC_STORE_WAKER(context, full_waker_, "waiting for space");
 5    return async2::Pending();
 6  }
 7
 8  async2::Poll<> PendNotEmpty(async2::Context& context) {
 9    PW_ASYNC_STORE_WAKER(context, empty_waker_, "waiting for data");
10    return async2::Pending();
11  }
12
13 private:
14  void DoNotify(Event event, size_t) override {
15    if (event == MultiBufObserver::Event::kBytesAdded) {
16      std::move(empty_waker_).Wake();
17    } else if (event == MultiBufObserver::Event::kBytesRemoved) {
18      std::move(full_waker_).Wake();
19    }
20  }
21
22  async2::Waker empty_waker_;
23  async2::Waker full_waker_;
24};

This type extends pw::MultiBufObserver and receives an Event every time the contents or structure of the MultiBuf instance changes.

With this, the queue can leverage the observer to add the same methods that produces and consumers can wait on:

 1class AsyncMultiBufQueue {
 2 public:
 3  AsyncMultiBufQueue(Allocator& allocator, size_t max_chunks)
 4      : mbuf_(allocator) {
 5    PW_ASSERT(mbuf_->TryReserveChunks(max_chunks));
 6    mbuf_->set_observer(&observer_);
 7  }
 8
 9  async2::Poll<> PendNotFull(async2::Context& context) {
10    return full() ? observer_.PendNotFull(context) : async2::Ready();
11  }
12
13  async2::Poll<> PendNotEmpty(async2::Context& context) {
14    return empty() ? observer_.PendNotEmpty(context) : async2::Ready();
15  }
16
17 private:
18  TrackedConstMultiBuf::Instance mbuf_;
19  AsyncMultiBufQueueObserver observer_;
20
21  // Remaining methods are the same as MultiBufQueue....

Note that this queue uses a pw::TrackedConstMultiBuf. The “Tracked” prefix indicates the MultiBuf instance supports observers, and the “Const” prefix indicates the data cannot be modified.

A producer task that adds data to this queue might look like the following:

 1  size_t producer_index = 0;
 2  async2::PendFuncTask producer(
 3      [&](async2::Context& context) -> async2::Poll<> {
 4        while (producer_index < kNumMsgs) {
 5          PW_TRY_READY(queue.PendNotFull(context));
 6          auto s = allocator.MakeUnique<std::byte[]>(4);
 7          const char* word = kWords[producer_index % kWords.size()];
 8          std::strncpy(reinterpret_cast<char*>(s.get()), word, s.size());
 9          queue.push_back(std::move(s));
10          ++producer_index;
11        }
12        return async2::Ready();
13      });

Finally, a consumer task that pulls data from this queue might look like the following:

 1  size_t consumer_index = 0;
 2  async2::PendFuncTask consumer(
 3      [&](async2::Context& context) -> async2::Poll<> {
 4        while (consumer_index < kNumMsgs) {
 5          PW_TRY_READY(queue.PendNotEmpty(context));
 6          auto s = queue.pop_front();
 7          const char* word = kWords[consumer_index % kWords.size()];
 8          EXPECT_STREQ(reinterpret_cast<const char*>(s.get()), word);
 9          ++consumer_index;
10        }
11        return async2::Ready();
12      });

Altogether, this approach is efficient for passing data as it avoids unnecessary data copies and leverages the pw_async2 framework for non-blocking synchronization.

For the complete example, see //pw_multibuf/examples/async_queue.cc.

Scatter-gather I/O#

Another possible use case for the MultiBuf type is to manage buffers for scatter-gather I/O operations. In this scenario, data is either read from a single source into multiple memory regions (scatter) or written from multiple memory regions to a single destination (gather).

As an example, the following container holds Messages for performing multiple I2C reads and writes in a single operation:

 1class MessageVector {
 2 public:
 3  explicit MessageVector(Allocator& allocator)
 4      : messages_(allocator), rx_buffers_(allocator), tx_buffers_(allocator) {}
 5
 6  void AddRead(Address addr, UniquePtr<std::byte[]> dst) {
 7    messages_.push_back(Message::ReadMessage(addr, {dst.get(), dst.size()}));
 8    rx_buffers_->PushBack(std::move(dst));
 9  }
10
11  void AddRead(Address addr, SharedPtr<std::byte[]> dst) {
12    messages_.push_back(Message::ReadMessage(addr, {dst.get(), dst.size()}));
13    rx_buffers_->PushBack(dst);
14  }
15
16  void AddWrite(Address addr, UniquePtr<const std::byte[]>&& src) {
17    messages_.push_back(Message::WriteMessage(addr, {src.get(), src.size()}));
18    tx_buffers_->PushBack(std::move(src));
19  }
20
21  void AddWrite(Address addr, const SharedPtr<const std::byte[]>& src) {
22    messages_.push_back(Message::WriteMessage(addr, {src.get(), src.size()}));
23    tx_buffers_->PushBack(src);
24  }
25
26 private:
27  friend class TestInitiator;
28
29  DynamicVector<Message> messages_;
30  TrackedMultiBuf::Instance rx_buffers_;
31  TrackedConstMultiBuf::Instance tx_buffers_;
32};

This container has a pw::TrackedMultiBuf for data to be read, and a pw::TrackedConstMultiBuf for data to be written. As the “Tracked” prefix indicates, these accept an observer that can be used to signal when an I2C transfer is complete:

 1class MessageVectorObserver : public MultiBufObserver {
 2 public:
 3  void AddBytes(size_t num_bytes) { num_bytes_ += num_bytes; }
 4
 5  Status Await(SystemClock::duration timeout) {
 6    return notification_.try_acquire_for(timeout) ? OkStatus()
 7                                                  : Status::DeadlineExceeded();
 8  }
 9
10 private:
11  void DoNotify(Event event, size_t value) override {
12    if (event == MultiBufObserver::Event::kBytesAdded) {
13      num_bytes_ += value;
14    } else if (event == MultiBufObserver::Event::kBytesRemoved) {
15      num_bytes_ -= value;
16    }
17    if (num_bytes_ == 0) {
18      notification_.release();
19    }
20  }
21
22  sync::TimedThreadNotification notification_;
23  size_t num_bytes_ = 0;
24};

With a real device, the Messages would be passed to an Initiator. This example uses a simpler TestInitiator type that simply accepts the messages and then waits for another thread to indicate the transfer is complete:

 1class TestInitiator {
 2 public:
 3  explicit TestInitiator(Allocator& allocator) : msg_vec_(allocator) {}
 4
 5  constexpr Status status() const { return status_; }
 6
 7  void StageForTransfer(MessageVector&& msg_vec) {
 8    msg_vec_ = std::move(msg_vec);
 9    observer_.AddBytes(msg_vec_.rx_buffers_->size());
10    observer_.AddBytes(msg_vec_.tx_buffers_->size());
11    msg_vec_.rx_buffers_->set_observer(&observer_);
12    msg_vec_.tx_buffers_->set_observer(&observer_);
13  }
14
15  void TransferFor(SystemClock::duration timeout) {
16    // The actual I2C transfer would be performed here...
17    status_ = observer_.Await(timeout);
18  }
19
20  void Complete() {
21    msg_vec_.rx_buffers_->Clear();
22    msg_vec_.tx_buffers_->Clear();
23  }
24
25 private:
26  MessageVector msg_vec_;
27  MessageVectorObserver observer_;
28  Status status_ = OkStatus();
29};

In this example, the MessageVector instance collects a series of I2C messages. For read operations, it adds a destination buffer to its rx_buffers_ field. For write operations, it adds a source buffer to its tx_buffers_ field. An I2C driver could then iterate over the messages_ vector and use the corresponding buffers from the MultiBuf instances to perform the I/O operations.

The main addition to the existing pw_i2c is that this example abstracts away the memory management of the individual buffers, and automatically notifies the observer when the transfer is complete and the messages are dropped.

For the complete example, see //pw_multibuf/examples/scatter_gather.cc.

Protocol message composition and decomposition#

The MultiBuf type was designed to facilitate creating and parsing network protocol messages. There are two general approaches to creating packets, referred to here as “top-down” and “bottom-up”. The “top-down” approach starts with payloads of the top-most protocol layer, and then has lower layer protocol fields added to it.

The MultiBuf type itself does not include code to interpret memory regions as protocol fields. Products are expected to use a component such as Emboss to accomplish this. This example provides some simple methods to get and set fields:

1template <typename T>
2constexpr T GetField(ConstByteSpan data, size_t offset) {
3  return bytes::ReadInOrder<T>(endian::little, &data[offset]);
4}
5
6template <typename T>
7constexpr void SetField(ByteSpan data, size_t offset, T value) {
8  return bytes::CopyInOrder<T>(endian::little, value, &data[offset]);
9}

These can be used to implement types for serializing and deserializing protocol messages to and from MultiBuf instances. For example, given a network packet protocol:

1// Protocol DemoNetwork have packets that fit entirely within a DemoLink frame.
2// They have 20-byte headers (8 byte src and dst address, and a 4 byte packet
3// length).
4struct DemoNetworkHeader {
5  uint64_t src_addr;
6  uint64_t dst_addr;
7  uint32_t length;
8};

A type to represent these network packets might look like:

 1class NetworkPacket {
 2 public:
 3  /// Create and a return a new network packet, or return an error if unable to
 4  /// allocate the needed memory.
 5  static Result<NetworkPacket> Create(Allocator& allocator) {
 6    auto metadata = allocator.MakeUnique<std::byte[]>(kDemoNetworkHeaderLen);
 7    if (metadata == nullptr) {
 8      return Status::ResourceExhausted();
 9    }
10    NetworkPacket packet(allocator);
11    if (!packet.mbuf_->TryReserveForPushBack(metadata)) {
12      return Status::ResourceExhausted();
13    }
14    packet.mbuf_->PushBack(std::move(metadata));
15    return packet;
16  }
17
18  void set_src_addr(uint64_t addr) {
19    SetField<uint64_t>(header(), offsetof(DemoNetworkHeader, src_addr), addr);
20  }
21
22  void set_dst_addr(uint64_t addr) {
23    SetField<uint64_t>(header(), offsetof(DemoNetworkHeader, dst_addr), addr);
24  }
25
26  /// Interpret the first chunk as a network packet header.
27  constexpr DemoNetworkHeader GetHeader() const {
28    return DemoNetworkHeader{
29        .src_addr =
30            GetField<uint64_t>(header(), offsetof(DemoNetworkHeader, src_addr)),
31        .dst_addr =
32            GetField<uint64_t>(header(), offsetof(DemoNetworkHeader, dst_addr)),
33        .length =
34            GetField<uint32_t>(header(), offsetof(DemoNetworkHeader, length)),
35    };
36  }
37
38  /// Add a payload to a network packet.
39  [[nodiscard]] bool AddPayload(UniquePtr<std::byte[]>&& payload) {
40    if (!mbuf_->TryReserveForPushBack(payload)) {
41      return false;
42    }
43    mbuf_->PushBack(std::move(payload));
44    size_t length = mbuf_->size();
45    PW_CHECK_UINT_LE(length, std::numeric_limits<uint32_t>::max());
46    SetField<uint32_t>(header(),
47                       offsetof(DemoNetworkHeader, length),
48                       static_cast<uint32_t>(length));
49    return true;
50  }
51
52  /// Consume a network packet and return its payload.
53  static Result<UniquePtr<std::byte[]>> ExtractPayload(NetworkPacket&& packet) {
54    DemoNetworkHeader header = packet.GetHeader();
55    if (header.length != packet.mbuf_->size()) {
56      return Status::DataLoss();
57    }
58    PW_TRY_ASSIGN(
59        auto iter,
60        packet.mbuf_->Discard(packet.mbuf_->cbegin(), kDemoNetworkHeaderLen));
61    return packet.mbuf_->Release(iter);
62  }
63
64 private:
65  constexpr NetworkPacket(Allocator& allocator) : mbuf_(allocator) {}
66
67  friend class LinkFrame;
68  explicit NetworkPacket(FlatMultiBuf&& mbuf) : mbuf_(std::move(mbuf)) {}
69
70  ByteSpan header() { return *(mbuf_->Chunks().begin()); }
71  constexpr ConstByteSpan header() const {
72    return *(mbuf_->ConstChunks().cbegin());
73  }
74
75  FlatMultiBuf::Instance mbuf_;
76};

Similarly, for a link frame protocol:

 1// Protocol DemoLink has frames up to 1014 bytes in length with 6-byte headers
 2// (2 bytes each for src addr, dst addr, and len) and a 4 byte crc32 checksum.
 3struct DemoLinkHeader {
 4  uint16_t src_addr;
 5  uint16_t dst_addr;
 6  uint16_t length;
 7};
 8
 9struct DemoLinkFooter {
10  uint32_t crc32;
11};

A type to represent these link frames might look like:

  1class LinkFrame {
  2 public:
  3  /// Create and a return a new link frame, or return an error if unable to
  4  /// allocate the needed memory.
  5  static Result<LinkFrame> Create(Allocator& allocator) {
  6    auto metadata = allocator.MakeUnique<std::byte[]>(kDemoLinkHeaderLen +
  7                                                      kDemoLinkFooterLen);
  8    if (metadata == nullptr) {
  9      return Status::ResourceExhausted();
 10    }
 11    LinkFrame frame(allocator);
 12    frame.mbuf_->PushBack(std::move(metadata));
 13    return frame;
 14  }
 15
 16  constexpr auto Chunks() { return mbuf_->Chunks(); }
 17  constexpr auto ConstChunks() const { return mbuf_->ConstChunks(); }
 18
 19  void set_src_addr(uint16_t addr) {
 20    SetField<uint16_t>(header(), offsetof(DemoLinkHeader, src_addr), addr);
 21  }
 22
 23  void set_dst_addr(uint16_t addr) {
 24    SetField<uint16_t>(header(), offsetof(DemoLinkHeader, dst_addr), addr);
 25  }
 26
 27  /// Interpret the first chunk as a link frame header.
 28  constexpr DemoLinkHeader GetHeader() const {
 29    return DemoLinkHeader{
 30        .src_addr =
 31            GetField<uint16_t>(header(), offsetof(DemoLinkHeader, src_addr)),
 32        .dst_addr =
 33            GetField<uint16_t>(header(), offsetof(DemoLinkHeader, dst_addr)),
 34        .length =
 35            GetField<uint16_t>(header(), offsetof(DemoLinkHeader, length)),
 36    };
 37  }
 38
 39  /// Interpret the last chunk as a link frame footer.
 40  constexpr DemoLinkFooter GetFooter() const {
 41    return DemoLinkFooter{
 42        .crc32 = GetField<uint32_t>(footer(), offsetof(DemoLinkFooter, crc32)),
 43    };
 44  }
 45
 46  /// Moves the given netrowk packet into the payload of this frame.
 47  [[nodiscard]] bool AddNetworkPacket(NetworkPacket&& packet) {
 48    auto iter = mbuf_->cend() - kDemoLinkFooterLen;
 49    if (!mbuf_->TryReserveForInsert(iter, *packet.mbuf_)) {
 50      return false;
 51    }
 52    mbuf_->Insert(iter, std::move(*packet.mbuf_));
 53    size_t length = mbuf_->size();
 54    PW_CHECK_UINT_LE(length, std::numeric_limits<uint16_t>::max());
 55    SetField<uint16_t>(header(),
 56                       offsetof(DemoLinkHeader, length),
 57                       static_cast<uint16_t>(length));
 58    return true;
 59  }
 60
 61  /// Updates the checksum for the finished frame.
 62  void Finalize() {
 63    SetField<uint32_t>(
 64        footer(), offsetof(DemoLinkFooter, crc32), CalculateCheckSum());
 65  }
 66
 67  /// Examines a link frame. If it is valid, returns its payload as a network
 68  /// packet, otherwise returns an error.
 69  static Result<NetworkPacket> ExtractNetworkPacket(LinkFrame&& frame) {
 70    DemoLinkHeader header = frame.GetHeader();
 71    DemoLinkFooter footer = frame.GetFooter();
 72    if (header.length != frame.mbuf_->size() ||
 73        footer.crc32 != frame.CalculateCheckSum()) {
 74      return Status::DataLoss();
 75    }
 76    uint32_t packet_length =
 77        header.length - (kDemoLinkHeaderLen + kDemoLinkFooterLen);
 78    auto iter = frame.mbuf_->cbegin();
 79    PW_TRY_ASSIGN(iter, frame.mbuf_->Discard(iter, kDemoLinkHeaderLen));
 80    iter += packet_length;
 81    PW_TRY_ASSIGN(iter, frame.mbuf_->Discard(iter, kDemoLinkFooterLen));
 82    return NetworkPacket(std::move(*frame.mbuf_));
 83  }
 84
 85 private:
 86  constexpr LinkFrame(Allocator& allocator) : mbuf_(allocator) {}
 87
 88  uint32_t CalculateCheckSum() const {
 89    checksum::Crc32 crc32;
 90    ConstByteSpan prev;
 91    for (ConstByteSpan chunk : mbuf_->ConstChunks()) {
 92      crc32.Update(prev);
 93      prev = chunk;
 94    }
 95    return crc32.value();
 96  }
 97
 98  constexpr ByteSpan header() { return *(mbuf_->Chunks().begin()); }
 99  constexpr ConstByteSpan header() const {
100    return *(mbuf_->ConstChunks().cbegin());
101  }
102
103  constexpr ByteSpan footer() { return *(--(mbuf_->Chunks().end())); }
104  constexpr ConstByteSpan footer() const {
105    return *(--(mbuf_->ConstChunks().cend()));
106  }
107
108  FlatMultiBuf::Instance mbuf_;
109};

With these, creating packets becomes straightforward:

 1  Result<NetworkPacket> tx_packet = NetworkPacket::Create(allocator);
 2  ASSERT_EQ(tx_packet.status(), OkStatus());
 3  tx_packet->set_src_addr(kNetSrcAddr);
 4  tx_packet->set_dst_addr(kNetDstAddr);
 5  ASSERT_TRUE(tx_packet->AddPayload(std::move(tx_payload)));
 6
 7  Result<LinkFrame> tx_frame = LinkFrame::Create(allocator);
 8  ASSERT_EQ(tx_frame.status(), OkStatus());
 9  tx_frame->set_src_addr(kLinkSrcAddr);
10  tx_frame->set_dst_addr(kLinkDstAddr);
11  ASSERT_TRUE(tx_frame->AddNetworkPacket(std::move(*tx_packet)));
12  tx_frame->Finalize();

For the complete example, see //pw_multibuf/examples/transfer.cc.

In-place modification#

Distinct from the previous example, the other approach to composing and decomposing protocol messages is a “bottom-up” approach. This approach starts with one or more maximally-sized protocol messages, and then proceeding to add Layers to restrict the view of the data to higher and higher protocols in the stack.

This approach is especially useful when the lower protocol fields need to be preserved. For example, an application which modifies only the top-most protocol in-place would benefit from this approach.

As an example of such an in-place modification, consider an “encryptor” that simply XORs the data with a seeded pseudorandom byte stream.

Warning

This example is only an example, and is NOT cryptographically secure! DO NOT use it to protect data!

This example includes the link frames and network packets from before, as well as a transport segment:

1// Protocol DemoTransport has segments up to ~4 GiB spanning multiple packets.
2// Each fragment of a segment includes an 12 byte header that includes a
3// segment ID, offset and length. The first fragment has and additional 4 byte
4// field for the total segment length.
5struct DemoTransportHeader {
6  uint64_t segment_id;
7  uint32_t offset;
8  uint32_t length;
9};

The types to implement this and the other protocols are similar to the previous example. A notable departure is how objects for each layer are created, with callers creating transport segments that have memory reserved for both the payload and lower level protocols:

 1  static Result<TransportSegment> Create(Allocator& allocator, uint64_t id) {
 2    auto packet = NetworkPacket::Create(allocator);
 3    if (!packet.ok()) {
 4      return packet.status();
 5    }
 6    TransportSegment segment = TransportSegment::From(std::move(*packet));
 7    SetHeaderField<uint64_t>(
 8        *segment.mbuf_, offsetof(DemoTransportHeader, segment_id), id);
 9    return segment;
10  }
1  static Result<NetworkPacket> Create(Allocator& allocator) {
2    auto frame = LinkFrame::Create(allocator);
3    if (!frame.ok()) {
4      return frame.status();
5    }
6    return NetworkPacket::From(std::move(*frame));
7  }
 1  static Result<LinkFrame> Create(Allocator& allocator) {
 2    MultiBuf::Instance mbuf(allocator);
 3    if (!mbuf->TryReserveLayers(4)) {
 4      return Status::ResourceExhausted();
 5    }
 6    auto buffer = allocator.MakeUnique<std::byte[]>(kMaxDemoLinkFrameLength);
 7    if (buffer == nullptr) {
 8      return Status::ResourceExhausted();
 9    }
10    mbuf->PushBack(std::move(buffer));
11    PW_CHECK(mbuf->AddLayer(0));
12    return LinkFrame(std::move(*mbuf));
13  }

Additionally, each type has factory methods to consume objects of one protocol layer and produce another, simply by adding and removing layers.

 1LinkFrame LinkFrame::From(NetworkPacket&& packet) {
 2  size_t length = packet.length() + kDemoLinkHeaderLen + kDemoLinkFooterLen;
 3  PW_CHECK_UINT_LE(length, std::numeric_limits<uint16_t>::max());
 4  LinkFrame frame(std::move(*packet.mbuf_));
 5  frame.mbuf_->PopLayer();
 6  frame.mbuf_->TruncateTopLayer(length);
 7  SetHeaderField<uint16_t>(*frame.mbuf_,
 8                           offsetof(DemoLinkHeader, length),
 9                           static_cast<uint16_t>(length));
10  return frame;
11}
12
13NetworkPacket NetworkPacket::From(LinkFrame&& frame) {
14  size_t length = frame.length() - (kDemoLinkHeaderLen + kDemoLinkFooterLen);
15  NetworkPacket packet(std::move(*frame.mbuf_));
16  PW_CHECK(packet.mbuf_->AddLayer(kDemoLinkHeaderLen, length));
17  SetHeaderField<uint32_t>(*packet.mbuf_,
18                           offsetof(DemoNetworkHeader, length),
19                           static_cast<uint32_t>(length));
20  return packet;
21}
22
23NetworkPacket NetworkPacket::From(TransportSegment&& segment) {
24  size_t length = segment.length() + kDemoNetworkHeaderLen;
25  PW_CHECK_UINT_LE(length, std::numeric_limits<uint32_t>::max());
26  NetworkPacket packet(std::move(*segment.mbuf_));
27  packet.mbuf_->PopLayer();
28  packet.mbuf_->TruncateTopLayer(length);
29  SetHeaderField<uint32_t>(*packet.mbuf_,
30                           offsetof(DemoNetworkHeader, length),
31                           static_cast<uint32_t>(length));
32  return packet;
33}
34
35TransportSegment TransportSegment::From(NetworkPacket&& packet) {
36  size_t length = packet.length() - kDemoNetworkHeaderLen;
37  TransportSegment segment(std::move(*packet.mbuf_));
38  PW_CHECK(segment.mbuf_->AddLayer(kDemoNetworkHeaderLen, length));
39  SetHeaderField<uint32_t>(*segment.mbuf_,
40                           offsetof(DemoTransportHeader, length),
41                           static_cast<uint32_t>(length));
42  return segment;
43}

These conversion methods are used to create asynchronous tasks that can pass protocol messages up and down the stack using queues:

 1template <typename FromProtocol, typename ToProtocol>
 2class Relay : public Task {
 3 public:
 4  Relay(pw::log::Token name,
 5        Closeable<FromProtocol>& rx,
 6        Closeable<ToProtocol>& tx)
 7      : Task(name), rx_(rx), tx_(tx) {}
 8
 9 private:
10  Poll<> DoPend(Context& context) override {
11    while (true) {
12      if (pending_.has_value()) {
13        PW_TRY_READY(tx_.PendHasSpace(context));
14        tx_.push(std::move(*pending_));
15        pending_.reset();
16      }
17      PW_TRY_READY_ASSIGN(auto status, rx_.PendNotEmpty(context));
18      if (!status.ok()) {
19        tx_.Close();
20        return Ready();
21      }
22      FromProtocol from(std::move(rx_.front()));
23      rx_.pop();
24      if constexpr (std::is_same_v<FromProtocol, ToProtocol>) {
25        pending_ = std::move(from);
26      } else {
27        pending_ = ToProtocol::From(std::move(from));
28      }
29    }
30  }
31
32  std::optional<ToProtocol> pending_;
33  Closeable<FromProtocol>& rx_;
34  Closeable<ToProtocol>& tx_;
35};
1template <typename NestedProtocol, typename OuterProtocol>
2class Sender : public Relay<NestedProtocol, OuterProtocol> {  //...
1template <typename OuterProtocol, typename NestedProtocol>
2class Receiver : public Relay<OuterProtocol, NestedProtocol> {  //...

The task doing the actual work of this example is the “encryptor”:

 1class Encryptor : public Task {
 2 public:
 3  constexpr explicit Encryptor(pw::log::Token name, uint64_t key)
 4      : Task(name), key_(key), rx_(rx_queue_), tx_(tx_queue_) {}
 5
 6  Closeable<TransportSegment>& rx() { return rx_; }
 7  Closeable<TransportSegment>& tx() { return tx_; }
 8
 9 private:
10  Poll<> DoPend(Context& context) override {
11    std::array<std::byte, sizeof(uint64_t)> pad;
12    while (true) {
13      if (segment_.has_value()) {
14        PW_TRY_READY(tx_.PendHasSpace(context));
15        tx_.push(std::move(*segment_));
16        segment_.reset();
17      }
18
19      PW_TRY_READY_ASSIGN(auto status, rx_.PendNotEmpty(context));
20      if (!status.ok()) {
21        tx_.Close();
22        return Ready();
23      }
24      segment_ = std::move(rx_.front());
25      rx_.pop();
26
27      // "Encrypt" the message. "Encrypting" again with the same key is
28      // equivalent to decrypting.
29      random::XorShiftStarRng64 rng(key_ ^ segment_->id());
30      ByteSpan payload = segment_->payload();
31      for (size_t i = 0; i < payload.size(); ++i) {
32        if ((i % pad.size()) == 0) {
33          rng.Get(pad);
34        }
35        payload[i] ^= pad[i % pad.size()];
36      }
37    }
38  }
39
40  const uint64_t key_;
41  std::optional<TransportSegment> segment_;
42
43  InlineAsyncQueue<TransportSegment, kCapacity> rx_queue_;
44  Closeable<TransportSegment> rx_;
45
46  InlineAsyncQueue<TransportSegment, kCapacity> tx_queue_;
47  Closeable<TransportSegment> tx_;
48};

With the addition of tasks to send and receive messages, all the pieces are in place to send “encrypted” messages from one end to the other:

 1  // Instantiate the sending tasks.
 2  Encryptor encryptor(PW_ASYNC_TASK_NAME("encryptor"), kKey);
 3  Sender<TransportSegment, NetworkPacket> net_sender(
 4      PW_ASYNC_TASK_NAME("net_sender"), encryptor.tx());
 5  Sender<NetworkPacket, LinkFrame> link_sender(
 6      PW_ASYNC_TASK_NAME("link_sender"), net_sender.queue());
 7
 8  // Instantiate the receiving tasks.
 9  Encryptor decryptor(PW_ASYNC_TASK_NAME("decryptor"), kKey);
10  Receiver<NetworkPacket, TransportSegment> net_receiver(
11      PW_ASYNC_TASK_NAME("net_receiver"), decryptor.rx());
12  Receiver<LinkFrame, NetworkPacket> link_receiver(
13      PW_ASYNC_TASK_NAME("link_receiver"), net_receiver.queue());
14
15  // Connect both ends.
16  Link link(link_sender.queue(), link_receiver.queue());
17
18  // Define a task that sends messages.
19  size_t tx_index = 0;
20  uint64_t segment_id = 0x1000;
21  async2::PendFuncTask msg_sender(
22      [&](async2::Context& context) -> async2::Poll<> {
23        auto& queue = encryptor.rx();
24        while (tx_index < kNumLines) {
25          PW_TRY_READY(
26              allocator.PendCanAllocate(context, kMaxDemoLinkFrameLength));
27          PW_TRY_READY(queue.PendHasSpace(context));
28          auto segment = TransportSegment::Create(allocator, segment_id++);
29          PW_CHECK_OK(segment.status());
30          const char* line = kTheAmaranth[tx_index];
31          segment->CopyFrom(line, strlen(line) + 1);
32          queue.push(std::move(*segment));
33          ++tx_index;
34        }
35        queue.Close();
36        return Ready();
37      });
38
39  // Define a task that receives messages.
40  size_t rx_index = 0;
41  async2::PendFuncTask msg_receiver(
42      [&](async2::Context& context) -> async2::Poll<> {
43        auto& queue = decryptor.tx();
44        while (true) {
45          PW_TRY_READY_ASSIGN(auto status, queue.PendNotEmpty(context));
46          if (!status.ok()) {
47            return Ready();
48          }
49          TransportSegment segment(std::move(queue.front()));
50          queue.pop();
51          EXPECT_STREQ(segment.AsCString(), kTheAmaranth[rx_index]);
52          ++rx_index;
53        }
54      });
55
56  // Run all tasks on the dispatcher.
57  dispatcher.Post(msg_sender);
58  dispatcher.Post(encryptor);
59  dispatcher.Post(net_sender);
60  dispatcher.Post(link_sender);
61  dispatcher.Post(link);
62  dispatcher.Post(link_receiver);
63  dispatcher.Post(net_receiver);
64  dispatcher.Post(decryptor);
65  dispatcher.Post(msg_receiver);

For the complete example, see //pw_multibuf/examples/pseudo_encrypt.cc.