pw_transfer

Attention

pw_transfer is under construction and so is its documentation.

Usage

C++

The transfer service is defined and registered with an RPC server like any other RPC service.

To know how to read data from or write data to device, a TransferHandler interface is defined (pw_transfer/public/pw_transfer/handler.h). Transfer handlers represent a transferable resource, wrapping a stream reader and/or writer with initialization and completion code. Custom transfer handler implementations should derive from ReadOnlyHandler, WriteOnlyHandler, or ReadWriteHandler as appropriate and override Prepare and Finalize methods if necessary.

A transfer handler should be implemented and instantiated for each unique data transfer to or from a device. These handlers are then registered with the transfer service using their resource IDs.

Example

#include "pw_transfer/transfer.h"

namespace {

// Simple transfer handler which reads data from an in-memory buffer.
class SimpleBufferReadHandler : public pw::transfer::ReadOnlyHandler {
 public:
  SimpleReadTransfer(uint32_t resource_id, pw::ConstByteSpan data)
      : ReadOnlyHandler(resource_id), reader_(data) {
    set_reader(reader_);
  }

 private:
  pw::stream::MemoryReader reader_;
};

// The maximum amount of data that can be sent in a single chunk, excluding
// transport layer overhead.
constexpr size_t kMaxChunkSizeBytes = 256;

// In a write transfer, the maximum number of bytes to receive at one time,
// (potentially across multiple chunks), unless specified otherwise by the
// transfer handler's stream::Writer.
constexpr size_t kDefaultMaxBytesToReceive = 1024;

// Instantiate a static transfer service.
// The service requires a work queue, and a buffer to store data from a chunk.
// The helper class TransferServiceBuffer comes with a builtin buffer.
pw::transfer::TransferServiceBuffer<kMaxChunkSizeBytes> transfer_service(
    GetSystemWorkQueue(), kDefaultMaxBytesToReceive);

// Instantiate a handler for the data to be transferred. The resource ID will
// be used by the transfer client and server to identify the handler.
constexpr uint32_t kBufferResourceId = 1;
char buffer_to_transfer[256] = { /* ... */ };
SimpleBufferReadHandler buffer_handler(kBufferResourceId, buffer_to_transfer);

}  // namespace

void InitTransfer() {
  // Register the handler with the transfer service, then the transfer service
  // with an RPC server.
  transfer_service.RegisterHandler(buffer_handler);
  GetSystemRpcServer().RegisterService(transfer_service);
}

Module Configuration Options

The following configurations can be adjusted via compile-time configuration of this module, see the module documentation for more details.

PW_TRANSFER_DEFAULT_MAX_RETRIES

The default maximum number of times a transfer should retry sending a chunk when no response is received. This can later be configured per-transfer.

PW_TRANSFER_DEFAULT_TIMEOUT_MS

The default amount of time, in milliseconds, to wait for a chunk to arrive before retrying. This can later be configured per-transfer.

PW_TRANSFER_DEFAULT_EXTEND_WINDOW_DIVISOR

The fractional position within a window at which a receive transfer should extend its window size to minimize the amount of time the transmitter spends blocked.

For example, a divisor of 2 will extend the window when half of the requested data has been received, a divisor of three will extend at a third of the window, and so on.

Python

Provides a simple interface for transferring bulk data over pw_rpc.

exception pw_transfer.Error(session_id: int, status: pw_status.Status)

Exception raised when a transfer fails.

Stores the ID of the failed transfer and the error that occurred.

__init__(session_id: int, status: pw_status.Status)
class pw_transfer.Manager(rpc_transfer_service, *, default_response_timeout_s: float = 2.0, initial_response_timeout_s: float = 4.0, max_retries: int = 3)

A manager for transmitting data through an RPC TransferService.

This should be initialized with an active Manager over an RPC channel. Only one instance of this class should exist for a configured RPC TransferService – the Manager supports multiple simultaneous transfers.

When created, a Manager starts a separate thread in which transfer communications and events are handled.

__init__(rpc_transfer_service, *, default_response_timeout_s: float = 2.0, initial_response_timeout_s: float = 4.0, max_retries: int = 3)

Initializes a Manager on top of a TransferService.

Parameters
  • rpc_transfer_service – the pw_rpc transfer service client

  • default_response_timeout_s – max time to wait between receiving packets

  • initial_response_timeout_s – timeout for the first packet; may be longer to account for transfer handler initialization

  • max_retires – number of times to retry after a timeout

read(session_id: int, progress_callback: Optional[Callable[[pw_transfer.transfer.ProgressStats], Any]] = None) bytes

Receives (“downloads”) data from the server.

Raises

Error – the transfer failed to complete

write(session_id: int, data: Union[bytes, str], progress_callback: Optional[Callable[[pw_transfer.transfer.ProgressStats], Any]] = None) None

Transmits (“uploads”) data to the server.

Parameters
  • session_id – ID of the write transfer

  • data – Data to send to the server.

  • progress_callback – Optional callback periodically invoked throughout the transfer with the transfer state. Can be used to provide user- facing status updates such as progress bars.

Raises

Error – the transfer failed to complete

class pw_transfer.ProgressStats(bytes_sent: int, bytes_confirmed_received: int, total_size_bytes: Optional[int])
__init__(bytes_sent: int, bytes_confirmed_received: int, total_size_bytes: Optional[int]) None

Example

import pw_transfer

# Initialize a Pigweed RPC client; see pw_rpc docs for more info.
rpc_client = CustomRpcClient()
rpcs = rpc_client.channel(1).rpcs

transfer_service = rpcs.pw.transfer.Transfer
transfer_manager = pw_transfer.Manager(transfer_service)

try:
  # Read the transfer resource with ID 3 from the server.
  data = transfer_manager.read(3)
except pw_transfer.Error as err:
  print('Failed to read:', err.status)

try:
  # Send some data to the server. The transfer manager does not have to be
  # reinitialized.
  transfer_manager.write(2, b'hello, world')
except pw_transfer.Error as err:
  print('Failed to write:', err.status)

Typescript

Provides a simple interface for transferring bulk data over pw_rpc.

Example

import {Manager} from '@pigweed/pw_transfer'

const client = new CustomRpcClient();
service = client.channel()!.service('pw.transfer.Transfer')!;

const manager = new Manager(service, DEFAULT_TIMEOUT_S);

manager.read(3, (stats: ProgressStats) => {
  console.log(`Progress Update: ${stats}`);
}).then((data: Uint8Array) => {
  console.log(`Completed read: ${data}`);
}).catch(error => {
  console.log(`Failed to read: ${error.status}`);
});

manager.write(2, textEncoder.encode('hello world'))
  .catch(error => {
    console.log(`Failed to read: ${error.status}`);
  });

Protocol

Protocol buffer definition

syntax = "proto3";

package pw.transfer;

option java_multiple_files = true;
option java_package = "dev.pigweed.pw_transfer";

// The transfer RPC service is used to send data between the client and server.
service Transfer {
  // Transfer data from the server to the client; a "download" from the client's
  // perspective.
  rpc Read(stream Chunk) returns (stream Chunk);

  // Transfer data from the client to the server; an "upload" from the client's
  // perspective.
  rpc Write(stream Chunk) returns (stream Chunk);
}

// Represents a chunk of data sent by the transfer service. Includes fields for
// configuring the transfer parameters.
//
// Notation: (Read|Write) (→|←)
//   X → Means client sending data to the server.
//   X ← Means server sending data to the client.
message Chunk {
  // Represents the source or destination of the data. May be ephemeral or
  // stable depending on the implementation. Sent in every request to identify
  // the transfer target.
  //
  //  Read → ID of transfer
  //  Read ← ID of transfer
  // Write → ID of transfer
  // Write ← ID of transfer
  uint32 session_id = 1;

  // Used by the receiver to indicate how many bytes it can accept. The
  // transmitter sends this much data, divided into chunks no larger than
  // max_chunk_size_bytes. The receiver then starts another window by sending
  // request_bytes again with a new offset.
  //
  //  Read → The client requests this many bytes to be sent.
  //  Read ← N/A
  // Write → N/A
  // Write ← The server requests this many bytes to be sent.
  optional uint32 pending_bytes = 2;

  // Maximum size of an individual chunk. The transmitter may send smaller
  // chunks if required.
  //
  //  Read → Set maximum size for subsequent chunks.
  //  Read ← N/A
  // Write → N/A
  // Write ← Set maximum size for subsequent chunks.
  optional uint32 max_chunk_size_bytes = 3;

  // Minimum required delay between chunks. The transmitter may delay longer if
  // desired.
  //
  //  Read → Set minimum delay for subsequent chunks.
  //  Read ← N/A
  // Write → N/A
  // Write ← Set minimum delay for subsequent chunks.
  optional uint32 min_delay_microseconds = 4;

  // On writes, the offset of the data. On reads, the offset at which to read.
  //
  //  Read → Read data starting at this offset.
  //  Read ← Offset of the data.
  // Write → Offset of the data.
  // Write ← Write data starting at this offset.
  uint64 offset = 5;

  // The data that was read or the data to write.
  //
  //  Read → N/A
  //  Read ← Data read
  // Write → Data to write
  // Write ← N/A
  bytes data = 6;

  // Estimated bytes remaining to read/write. Optional except for the last data
  // chunk, for which remaining_bytes must be set to 0.
  //
  // The sender can set remaining_bytes at the beginning of a read/write so that
  // the receiver can track progress or cancel the transaction if the value is
  // too large.
  //
  //  Read → N/A
  //  Read ← Remaining bytes to read, excluding any data in this chunk. Set to
  //         0 for the last chunk.
  // Write → Remaining bytes to write, excluding any data in is chunk. Set to
  //         0 for the last chunk.
  // Write ← N/A
  optional uint64 remaining_bytes = 7;

  // Pigweed status code indicating the completion of a transfer. This is only
  // present in the final packet sent by either the transmitter or receiver.
  //
  // The possible status codes and their meanings are listed below:
  //
  //   OK: Transfer completed successfully.
  //   DATA_LOSS: Transfer data could not be read/written (e.g. corruption).
  //   INVALID_ARGUMENT: Received malformed chunk.
  //   NOT_FOUND: The requested resource ID is not registered (read/write).
  //   OUT_OF_RANGE: The requested offset is larger than the data (read/write).
  //   RESOURCE_EXHAUSTED: Concurrent transfer limit reached.
  //   UNIMPLEMENTED: Resource ID does not support requested operation (e.g.
  //       trying to write to a read-only transfer).
  //
  //  Read → Transfer complete.
  //  Read ← Transfer complete.
  // Write → Transfer complete.
  // Write ← Transfer complete.
  optional uint32 status = 8;

  // The offset up to which the transmitter can send data before waiting for the
  // receiver to acknowledge.
  //
  //  Read → Offset up to which the server can send without blocking.
  //  Read ← N/A
  // Write → N/A
  // Write ← Offset up to which the client can send without blocking.
  //
  // TODO(frolv): This will replace the pending_bytes field. Once all uses of
  // transfer are migrated, that field should be removed.
  uint32 window_end_offset = 9;

  enum Type {
    // Chunk containing transfer data.
    TRANSFER_DATA = 0;

    // First chunk of a transfer (only sent by the client).
    TRANSFER_START = 1;

    // Transfer parameters indicating that the transmitter should retransmit
    // from the specified offset.
    PARAMETERS_RETRANSMIT = 2;

    // Transfer parameters telling the transmitter to continue sending up to
    // index `offset + pending_bytes` of data. If the transmitter is already
    // beyond `offset`, it does not have to rewind.
    PARAMETERS_CONTINUE = 3;

    // Sender of the chunk is terminating the transfer.
    TRANSFER_COMPLETION = 4;

    // Acknowledge the completion of a transfer. Currently unused.
    // TODO(konkers): Implement this behavior.
    TRANSFER_COMPLETION_ACK = 5;
  };

  // The type of this chunk. This field should only be processed when present.
  // TODO(frolv): Update all users of pw_transfer and remove the optional
  // semantics from this field.
  //
  //  Read → Chunk type (start/parameters).
  //  Read ← Chunk type (data).
  // Write → Chunk type (data).
  // Write ← Chunk type (start/parameters).
  optional Type type = 10;

  // Not currently used. Will inherit session_id's behavior in the future.
  uint32 resource_id = 11;
}

Server to client transfer (read)

../_images/read.svg

Client to server transfer (write)

../_images/write.svg

Errors

Protocol errors

At any point, either the client or server may terminate the transfer with a status code. The transfer chunk with the status is the final chunk of the transfer.

The following table describes the meaning of each status code when sent by the sender or the receiver (see Transfer roles).

Status

Sent by sender

Sent by receiver

OK

(not sent)

All data was received and handled successfully.

ABORTED

The service aborted the transfer because the client restarted it. This status is passed to the transfer handler, but not sent to the client because it restarted the transfer.

CANCELLED

The client cancelled the transfer.

DATA_LOSS

Failed to read the data to send. The Reader returned an error.

Failed to write the received data. The Writer returned an error.

FAILED_PRECONDITION

Received chunk for transfer that is not active.

INVALID_ARGUMENT

Received a malformed packet.

INTERNAL

An assumption of the protocol was violated. Encountering INTERNAL indicates that there is a bug in the service or client implementation.

PERMISSION_DENIED

The transfer does not support the requested operation (either reading or writing).

RESOURCE_EXHAUSTED

The receiver requested zero bytes, indicating their storage is full, but there is still data to send.

Storage is full.

UNAVAILABLE

The service is busy with other transfers and cannot begin a new transfer at this time.

UNIMPLEMENTED

Out-of-order chunk was requested, but seeking is not supported.

(not sent)

Client errors

pw_transfer clients may immediately return certain errors if they cannot start a transfer.

Status

Reason

ALREADY_EXISTS

A transfer with the requested ID is already pending on this client.

DATA_LOSS

Sending the initial transfer chunk failed.

RESOURCE_EXHAUSTED

The client has insufficient resources to start an additional transfer at this time.

Transfer roles

Every transfer has two participants: the sender and the receiver. The sender transmits data to the receiver. The receiver controls how the data is transferred and sends the final status when the transfer is complete.

In read transfers, the client is the receiver and the service is the sender. In write transfers, the client is the sender and the service is the receiver.

Sender flow

graph TD start([Client initiates<br>transfer]) -->data_request data_request[Receive transfer<br>parameters]-->send_chunk send_chunk[Send chunk]-->sent_all sent_all{Sent final<br>chunk?} -->|yes|wait sent_all-->|no|sent_requested sent_requested{Sent all<br>pending?}-->|yes|data_request sent_requested-->|no|send_chunk wait[Wait for receiver]-->is_done is_done{Received<br>final chunk?}-->|yes|done is_done-->|no|data_request done([Transfer complete])

Receiver flow

graph TD start([Client initiates<br>transfer]) -->request_bytes request_bytes[Set transfer<br>parameters]-->wait wait[Wait for chunk]-->received_chunk received_chunk{Received<br>chunk by<br>deadline?}-->|no|request_bytes received_chunk-->|yes|check_chunk check_chunk{Correct<br>offset?} -->|yes|process_chunk check_chunk --> |no|request_bytes process_chunk[Process chunk]-->final_chunk final_chunk{Final<br>chunk?}-->|yes|signal_completion final_chunk{Final<br>chunk?}-->|no|received_requested received_requested{Received all<br>pending?}-->|yes|request_bytes received_requested-->|no|wait signal_completion[Signal completion]-->done done([Transfer complete])

Integration tests

The pw_transfer module has a set of integration tests that verify the correctness of implementations in different languages. Test source code.

To run the tests on your machine, run

$ bazel run pw_transfer/integration_test:cross_language_integration_test

The integration tests permit injection of client/server/proxy binaries to use when running the tests. This allows manual testing of older versions of pw_transfer against newer versions.

# Test a newer version of pw_transfer against an old C++ client that was
# backed up to another directory.
$ bazel run pw_transfer/integration_test:cross_language_integration_test -- \
    --cpp-client ../old_pw_transfer_version/cpp_client

CI/CQ integration

Current status of the test in CI.

By default, these tests are not run in CQ (on presubmit) because they are too slow. However, you can request that the tests be run in presubmit on your change by adding to following line to the commit message footer:

Cq-Include-Trybots: luci.pigweed.try:pigweed-integration-transfer