pw_transfer

Attention

pw_transfer is under construction and so is its documentation.

Usage

C++

The transfer service is defined and registered with an RPC server like any other RPC service.

To know how to read data from or write data to device, a TransferHandler interface is defined (pw_transfer/public/pw_transfer/handler.h). Transfer handlers wrap a stream reader and/or writer with initialization and completion code. Custom transfer handler implementations should derive from ReadOnlyHandler, WriteOnlyHandler, or ReadWriteHandler as appropriate and override Prepare and Finalize methods if necessary.

A transfer handler should be implemented and instantiated for each unique data transfer to or from a device. These handlers are then registered with the transfer service using their transfer IDs.

Example

#include "pw_transfer/transfer.h"

namespace {

// Simple transfer handler which reads data from an in-memory buffer.
class SimpleBufferReadHandler : public pw::transfer::ReadOnlyHandler {
 public:
  SimpleReadTransfer(uint32_t transfer_id, pw::ConstByteSpan data)
      : ReadOnlyHandler(transfer_id), reader_(data) {
    set_reader(reader_);
  }

 private:
  pw::stream::MemoryReader reader_;
};

// The maximum amount of data that can be sent in a single chunk, excluding
// transport layer overhead.
constexpr size_t kMaxChunkSizeBytes = 256;

// In a write transfer, the maximum number of bytes to receive at one time,
// (potentially across multiple chunks), unless specified otherwise by the
// transfer handler's stream::Writer.
constexpr size_t kDefaultMaxBytesToReceive = 1024;

// Instantiate a static transfer service.
pw::transfer::TransferService transfer_service(
    kMaxChunkSizeBytes, kDefaultMaxBytesToReceive);

// Instantiate a handler for the the data to be transferred.
constexpr uint32_t kBufferTransferId = 1;
char buffer_to_transfer[256] = { /* ... */ };
SimpleBufferReadHandler buffer_handler(kBufferTransferId, buffer_to_transfer);

}  // namespace

void InitTransfer() {
  // Register the handler with the transfer service, then the transfer service
  // with an RPC server.
  transfer_service.RegisterHandler(buffer_handler);
  GetSystemRpcServer().RegisterService(transfer_service);
}

Python

Provides a simple interface for transferring bulk data over pw_rpc.

exception pw_transfer.transfer.Error(transfer_id: int, status: pw_status.Status)

Exception raised when a transfer fails.

Stores the ID of the failed transfer and the error that occurred.

__init__(transfer_id: int, status: pw_status.Status)
class pw_transfer.transfer.Manager(rpc_transfer_service, default_response_timeout_s: float = 2.0)

A manager for transmitting data through an RPC TransferService.

This should be initialized with an active Manager over an RPC channel. Only one instance of this class should exist for a configured RPC TransferService – the Manager supports multiple simultaneous transfers.

When created, a Manager starts a separate thread in which transfer communications and events are handled.

__init__(rpc_transfer_service, default_response_timeout_s: float = 2.0)

Initializes a Manager on top of a TransferService.

read(transfer_id: int, progress_callback: Optional[Callable[[pw_transfer.transfer.ProgressStats], Any]] = None) bytes

Receives (“downloads”) data from the server.

Raises

Error – the transfer failed to complete

write(transfer_id: int, data: Union[bytes, str], progress_callback: Optional[Callable[[pw_transfer.transfer.ProgressStats], Any]] = None) None

Transmits (“uploads”) data to the server.

Parameters
  • transfer_id – ID of the write transfer

  • data – Data to send to the server.

  • progress_callback – Optional callback periodically invoked throughout the transfer with the transfer state. Can be used to provide user- facing status updates such as progress bars.

Raises

Error – the transfer failed to complete

class pw_transfer.transfer.ProgressStats(total_size_bytes: Optional[int], bytes_confirmed_received: int, bytes_sent: int)
__init__(total_size_bytes: Optional[int], bytes_confirmed_received: int, bytes_sent: int) None

Example

from pw_transfer import transfer

# Initialize a Pigweed RPC client; see pw_rpc docs for more info.
rpc_client = CustomRpcClient()
rpcs = rpc_client.channel(1).rpcs

transfer_service = rpcs.pw.transfer.Transfer
transfer_manager = transfer.Manager(transfer_service)

try:
  # Read transfer_id 3 from the server.
  data = transfer_manager.read(3)
except transfer.Error as err:
  print('Failed to read:', err.status)

try:
  # Send some data to the server. The transfer manager does not have to be
  # reinitialized.
  transfer_manager.write(2, b'hello, world')
except transfer.Error as err:
  print('Failed to write:', err.status)

Protocol

Protocol buffer definition

syntax = "proto3";

package pw.transfer;

// The transfer RPC service is used to send data between the client and server.
service Transfer {
  // Transfer data from the server to the client; a "download" from the client's
  // perspective.
  rpc Read(stream Chunk) returns (stream Chunk);

  // Transfer data from the client to the server; an "upload" from the client's
  // perspective.
  rpc Write(stream Chunk) returns (stream Chunk);
}

// Represents a chunk of data sent by the transfer service. Includes fields for
// configuring the transfer parameters.
//
// Notation: (Read|Write) (→|←)
//   X → Means client sending data to the server.
//   X ← Means server sending data to the client.
message Chunk {
  // Represents the source or destination of the data. May be ephemeral or
  // stable depending on the implementation. Sent in every request to identify
  // the transfer target.
  //
  //  Read → ID of transfer
  //  Read ← ID of transfer
  // Write → ID of transfer
  // Write ← ID of transfer
  uint32 transfer_id = 1;

  // Used by the receiver to indicate how many bytes it can accept. The
  // transmitter sends this much data, divided into chunks no larger than
  // max_chunk_size_bytes. The receiver then starts another window by sending
  // request_bytes again with a new offset.
  //
  //  Read → The client requests this many bytes to be sent.
  //  Read ← N/A
  // Write → N/A
  // Write ← The server requests this many bytes to be sent.
  optional uint32 pending_bytes = 2;

  // Maximum size of an individual chunk. The transmitter may send smaller
  // chunks if required.
  //
  //  Read → Set maximum size for subsequent chunks.
  //  Read ← N/A
  // Write → N/A
  // Write ← Set maximum size for subsequent chunks.
  optional uint32 max_chunk_size_bytes = 3;

  // Minimum required delay between chunks. The transmitter may delay longer if
  // desired.
  //
  //  Read → Set minimum delay for subsequent chunks.
  //  Read ← N/A
  // Write → N/A
  // Write ← Set minimum delay for subsequent chunks.
  optional uint32 min_delay_microseconds = 4;

  // On writes, the offset of the data. On reads, the offset at which to read.
  //
  //  Read → Read data starting at this offset.
  //  Read ← Offset of the data.
  // Write → Offset of the data.
  // Write ← Write data starting at this offset.
  uint64 offset = 5;

  // The data that was read or the data to write.
  //
  //  Read → N/A
  //  Read ← Data read
  // Write → Data to write
  // Write ← N/A
  bytes data = 6;

  // Estimated bytes remaining to read/write. Optional except for the last data
  // chunk, for which remaining_bytes must be set to 0.
  //
  // The sender can set remaining_bytes at the beginning of a read/write so that
  // the receiver can track progress or cancel the transaction if the value is
  // too large.
  //
  //  Read → N/A
  //  Read ← Remaining bytes to read, excluding any data in this chunk. Set to
  //         0 for the last chunk.
  // Write → Reamining bytes to write, excluding any data in is chunk. Set to
  //         0 for the last chunk.
  // Write ← N/A
  optional uint64 remaining_bytes = 7;

  // Pigweed status code indicating the completion of a transfer. This is only
  // present in the final packet sent by either the transmitter or receiver.
  //
  // The possible status codes and their meanings are listed below:
  //
  //   OK: Transfer completed successfully.
  //   DATA_LOSS: Transfer data could not be read/written (e.g. corruption).
  //   INVALID_ARGUMENT: Received malformed chunk.
  //   NOT_FOUND: The requested transfer ID is not registered (read/write).
  //   OUT_OF_RANGE: The requested offset is larger than the data (read/write).
  //   RESOURCE_EXHAUSTED: Concurrent transfer limit reached.
  //   UNIMPLEMENTED: Transfer ID does not support requested operation (e.g.
  //       trying to write to a read-only transfer).
  //
  //  Read → Transfer complete.
  //  Read ← Transfer complete.
  // Write → Transfer complete.
  // Write ← Transfer complete.
  optional uint32 status = 8;
}

Server to client transfer (read)

../_images/read.svg

Client to server transfer (write)

../_images/write.svg

Errors

At any point, either the client or server may terminate the transfer by sending an error chunk with the transfer ID and a non-OK status.

Transmitter flow

graph TD start([Client initiates<br>transfer]) -->data_request data_request[Receive transfer<br>parameters]-->send_chunk send_chunk[Send chunk]-->sent_all sent_all{Sent final<br>chunk?} -->|yes|wait sent_all-->|no|sent_requested sent_requested{Sent all<br>pending?}-->|yes|data_request sent_requested-->|no|send_chunk wait[Wait for receiver]-->is_done is_done{Received<br>final chunk?}-->|yes|done is_done-->|no|data_request done([Transfer complete])

Receiver flow

graph TD start([Client initiates<br>transfer]) -->request_bytes request_bytes[Set transfer<br>parameters]-->wait wait[Wait for chunk]-->received_chunk received_chunk{Received<br>chunk by<br>deadline?}-->|no|request_bytes received_chunk-->|yes|check_chunk check_chunk{Correct<br>offset?} -->|yes|process_chunk check_chunk --> |no|request_bytes process_chunk[Process chunk]-->final_chunk final_chunk{Final<br>chunk?}-->|yes|signal_completion final_chunk{Final<br>chunk?}-->|no|received_requested received_requested{Received all<br>pending?}-->|yes|request_bytes received_requested-->|no|wait signal_completion[Signal completion]-->done done([Transfer complete])