0124: Interfaces for Retrieving Size Information from Multisink#

Status: Open for Comments Intent Approved Last Call Accepted Rejected

Proposal Date: 2024-01-22

CL: pwrev/188670

Author: Jiacheng Lu

Facilitator: Carlos Chinchilla

Summary#

This SEED proposes adding interfaces to pw_multisink to retrieve the capacity and filled size from its underlying buffer.

Motivation#

Currently, pw_multisink provides Listener to help schedule draining of entries. Interfaces of pw_multisink and its Listener provides no information about capacity or consumed size of the underlying buffer.

By adding interfaces to pw_multisink to provide size information of its underlying buffer, scheduling policies can be implemented to trigger draining based on, for example, the size of unread data in the buffer.

Proposal#

Add new interface UnsafeGetUnreadEntriesSize() to pw_multisink::MultiSink::Drain that reports size of unread data in the underlying buffer.

class MutilSink {
 public:
 ...

   class Drain {
      ...

      // Both two APIs beturn size in bytes of the valid data in the
      // underlying buffer that has not been read by this drain.

      // This is a thread unsafe version. It requires the `lock` of
      // `multisink` being held. For example, it can be used inside
      // Listeners' methods, where lock already held, to check size
      // on handling each new entry.
      size_t UnsafeGetUnreadEntriesSize() PW_NO_LOCK_SAFETY_ANALYSIS;

      // A thread-safe verson.
      size_t GetUnreadEntriesSize() PW_LOCKS_EXCLUDED(multisink_->lock_);

      ...

    protected:
      friend class MultiSink;

      MultiSink* multisink_;
   };

 ...

 private:
   LockType lock_;
};

Problem investigation#

pw_multisink is a popular choice to buffer data from logs for Pigweed based softwares. An example is pw_log_rpc where an instance of pw_multisink is used to buffer logs before they are drained and transmitted. Make sure pw_multisink are drained in time is important to reduce the chances of dropping logs because of running out of space. However, the draining and transmission behavior may be constrained by the property of underlying physical channels. For certain physical channels, there are tradeoffs between frequency of transmission and costs.

PCI is one of the example of physical channels that have the above mentioned tradeoffs. PCI implementation normally have different levels of power states for power efficiency. Transmitting data over PCI continuously may result to it not being able to enter deep sleep states. Also, PCI are normally have a high transmission bandwidth. Buffering data to a certain threshold and then sending them all over in a single transmission fits better with its design.

The OnNewEntryAvailable on the current Listener interfaces does not provide enough information about buffered data size because the current implementation of underlying buffer is pw_ring_buffer, it stores additional, variable lenghted data for its internal for each entry.

Assuming the proposed interface is avaible, the imagined use case looks like:

class DrainThread: public pw::thread::ThreadCore,
                   public pw::multisink::MultiSink::Listener {
 public:

   ... // initialize with drain

   bool NeedFlush(size_t unread_entries_size) {
      ...
   }

   void Flush(pw::multisink::MultiSink::Drain& drain) {
     ...
   }

   void OnNewEntryAvailable() override {
      if (NeedFlush(drain_.UnsafeGetUnreadEntriesSize())) {
         flush_threshold_reached_notification_.release();
      }
   }

   void Run() override {
      multisink_.AttachListner(*this);

      while (true) {
         flush_threshold_reached_notification_.acquire();
         Flush(drain_);
      }
   }


 private:
   pw::multisink::MultiSink& multisink_;
   pw::multisink::MultiSink::Drain& drain_;
   pw::ThreadNotification flush_threshold_reached_notification_;
};

Detailed design#

Implement EntriesSize() in pw_ring_buffer::PrefixedEntryRingBufferMulti::Reader to provide the number of bytes between its reader pointer and ring buffer’s writer pointer.

class PrefixedEntryRingBufferMulti {
  class Reader : public IntrusiveList<Reader>::Item {
   public:

    // Get the size of the unread entries currently in the ring buffer.
    // Return value:
    // Number of bytes
    size_t EntriesSize() const {
      // Case: Not wrapped.
      if (read_idx_ < buffer_->write_idx_) {
        return buffer_->write_idx_ - read_idx_;
      }
      // Case: Wrapped.
      if (read_idx_ > buffer_->write_idx_) {
        return buffer_->buffer_bytes_ - (read_idx_ - buffer_->write_idx_);
      }

      // No entries remaining.
      if (entry_count_ == 0) {
        return 0;
      }

      return buffer_->buffer_bytes_;
    }

   private:
    PrefixedEntryRingBufferMulti* buffer_;
    size_t read_idx_;
  };

 private:
  size_t write_idx_;
  size_t buffer_bytes_;
};

The unread data size of Drain is directly fetched from ring buffer’s Reader. Because pw_multisink uses lock_ to protect accesses to all listeners’ methods already, in order to support calling the proposed interfaces from listeners, this design introduces two versions of API, one thread-safe version that is intended to be used outside of listeners, and one thread-unsafe version requires that lock_ of pw_multisink being held when invoking.

namespace pw {
namespace multisink {

class MutilSink {
 public:
  ...

   class Drain {
    public:

      // Both two APIs beturn size in bytes of the valid data in the
      // underlying buffer that has not been read by this drain.

      // Ideally it should use annotation
      //     PW_EXCLUSIVE_LOCKS_REQUIRED(multisink_->lock_)
      // however, Listener interfaces, where it is intended to be called,
      // cannot be annotated using multisink's lock. Static analysis is not
      // doable.
      size_t UnsafeGetUnreadEntriesSize() PW_NO_LOCK_SAFETY_ANALYSIS {
         return reader_.EntriesSize();
      }

      size_t GetUnreadEntriesSize() PW_LOCKS_EXCLUDED(multisink_->lock_) {
         std::lock_guard lock(multisink_->lock_);
         return UnsafeGetUnreadEntriesSize();
      }

    protected:
      friend class MultiSink;

      MultiSink* multisink_;
      ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_;
   };

 ...

 private:
   LockType lock_;
   ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_
      PW_GUARDED_BY(lock_);
};

}  // namespace multisink
}  // namespace pw

Alternatives#

Add on buffer size change interface to listener#

Add OnBufferSizeChange interface to pw_multisink::MultiSinkListener. The new interface gets invoked when the available size of the underlying buffer changes.

Interface example:

class MultiSink {
 public:
  class Listener {
   public:

    ...

    virtual void OnNewEntryAvailable() = 0;

    virtual void OnBufferSizeChange(size_t total_size, size_t used_sized);
  };

  ...
}

Imagined implementation of OnBufferSizeChange being invoked after an entry push or pop. It uses existing interfaces of the underlying pw_ring_buffer. In reality, this implementation does not work well, explained in the problems sections below.

void MutilSink::HandleEntry(ConstByteSpan entry) {
  std::lock_guard lock(lock_);
  ...
  ring_buffer_.PushBack(entry);
  NotifyListenersBufferSizeChanged(
    ring_buffer_.TotalSizeBytes(),
    ring_buffer_.TotalUsedBytes());
  ...
}
void MutilSink::PopEntry(Drain& drain, ConstByteSpan entry) {
  std::lock_guard lock(lock_);
  ...
  const size_t used_size_before_pop = ring_buffer_.TotalUsedBytes();
  drain.reader_.PopFront();
  const size_t used_size_after_pop = ring_buffer_.TotalUsedBytes();
  if (used_size_before_pop != used_size_after_pop) {
    NotifyListenersBufferSizeChanged(
      ring_buffer_.TotalSizeBytes(),
      used_size_after_pop);
  }
  ...
}

Problem 1. Find the slowest drain#

When there are multiple drains attached to pw_multisink, only the slowest drain(s) frees space from the underlying pw_ring_buffer when pops.

pw_multisink supports Late Drain Attach which attaches an internal drain that never pops. The TotalUsedBytes() reported by underlying pw_ring_buffer counts from the slowest drain and always reports the full capacity instead of the real used size.

Problem 2. Push a entry when buffer is full may decrease used size#

When the pushing of a new entry exceeds the remaining free space of the underlying buffer, the push can still succeed, by silent dropping entries from the slowest drain. However, depending on the size of dropped entries and the size of the new entry, the used buffer size may increase, decrease or stay the same.

If the user of the proposed OnBufferSizeChange interface is comparing the reported used bytes with a threshold value, it is likely that the moment of underlying buffer being full may not be catched.

Although it is possible to also trigger OnBufferSizeChange with used_size == total_size when the above situation happens, the implementation may also require exposing internal states of pw_ring_buffer.

Open questions#