Python client#

pw_rpc: Efficient, low-code-size RPC system for embedded devices

The pw_rpc Python package makes it possible to call Pigweed RPCs from Python. The package includes a pw_rpc client library, as well as tools for creating a pw_rpc console, and a separate benchmarking utility for measuring and debugging RPC performance.

pw_rpc.client#

Provides a pw_rpc client for Python.

class pw_rpc.client.Client(
impl: ClientImpl,
channels: Iterable[Channel],
services: Iterable[Service],
)#

Sends requests and handles responses for a set of channels.

RPC invocations occur through a ChannelClient.

Users may set an optional response_callback that is called before processing every response or server stream RPC packet.

__init__(
impl: ClientImpl,
channels: Iterable[Channel],
services: Iterable[Service],
)#
channel(channel_id: int | None = None) ChannelClient#

Returns a ChannelClient, which is used to call RPCs on a channel.

If no channel is provided, the first channel is used.

channels() Iterable[ChannelClient]#

Accesses the ChannelClients in this client.

method(method_name: str) Method#

Returns a Method matching the given name.

Parameters:

method_name – name as package.Service/Method or package.Service.Method.

Raises:
  • ValueError – the method name is not properly formatted

  • KeyError – the method is not present

methods() Iterator[Method]#

Iterates over all Methods supported by this client.

process_packet(pw_rpc_raw_packet_data: bytes) Status#

Processes an incoming packet.

Parameters:

pw_rpc_raw_packet_data – raw binary data for exactly one RPC packet

Returns:

OK - the packet was processed by this client DATA_LOSS - the packet could not be decoded INVALID_ARGUMENT - the packet is for a server, not a client NOT_FOUND - the packet’s channel ID is not known to this client

class pw_rpc.client.ClientImpl#

The internal interface of the RPC client.

This interface defines the semantics for invoking an RPC on a particular client.

__init__() None#
abstract handle_completion(rpc: PendingRpc, context: Any, status: Status) Any#

Handles the successful completion of an RPC.

Parameters:
  • rpc – Information about the pending RPC

  • context – Arbitrary context object associated with the pending RPC

  • status – Status returned from the RPC

abstract handle_error(rpc: PendingRpc, context, status: Status)#

Handles the abnormal termination of an RPC.

Parameters:
  • rpc – Information about the pending RPC

  • context – Arbitrary context object associated with the pending RPC

  • status – which error occurred

abstract handle_response(rpc: PendingRpc, context: Any, payload: Any) Any#

Handles a response from the RPC server.

Parameters:
  • rpc – Information about the pending RPC

  • context – Arbitrary context object associated with the pending RPC

  • payload – A protobuf message

abstract method_client(channel: Channel, method: Method) Any#

Returns an object that invokes a method using the given channel.

pw_rpc.client_utils#

Utilities for using pw_rpc.Client.

class pw_rpc.client_utils.NoEncodingSingleChannelRpcClient(
reader: CancellableReader,
paths_or_modules: Iterable[str | Path | ModuleType] | Library,
channel: Channel,
client_impl: ClientImpl | None = None,
)#

An RPC client without any frame encoding with a single channel output.

The caveat is that the provided read function must read entire frames.

__init__(
reader: CancellableReader,
paths_or_modules: Iterable[str | Path | ModuleType] | Library,
channel: Channel,
client_impl: ClientImpl | None = None,
)#

Creates an RPC client over a single channel with no frame encoding.

Parameters:
  • reader – Readable object used to receive RPC packets.

  • paths_or_modules – paths to .proto files or proto modules.

  • channel – RPC channel to use for output.

  • client_impl – The RPC Client implementation. Defaults to the callback client implementation if not provided.

class pw_rpc.client_utils.RpcClient(
reader_and_executor: DataReaderAndExecutor,
paths_or_modules: Iterable[str | Path | ModuleType] | Library,
channels: Iterable[Channel],
client_impl: ClientImpl | None = None,
)#

An RPC client with configurable incoming data processing.

__init__(
reader_and_executor: DataReaderAndExecutor,
paths_or_modules: Iterable[str | Path | ModuleType] | Library,
channels: Iterable[Channel],
client_impl: ClientImpl | None = None,
)#

Creates an RPC client.

Parameters:
  • reader_and_executorDataReaderAndExecutor instance.

  • paths_or_modules – paths to .proto files or proto modules.

  • channels – RPC channels to use for output.

  • client_impl – The RPC client implementation. Defaults to the callback client implementation if not provided.

rpcs(channel_id: int | None = None) Any#

Returns object for accessing services on the specified channel.

This skips some intermediate layers to make it simpler to invoke RPCs from an HdlcRpcClient. If only one channel is in use, the channel ID is not necessary.

pw_rpc.callback_client#

Defines a callback-based RPC ClientImpl to use with pw_rpc.Client.

callback_client.Impl supports invoking RPCs synchronously or asynchronously. Asynchronous invocations use a callback.

Synchronous invocations look like a function call. When invoking a unary or server streaming RPC, the request may be provided as a message object or as keyword arguments for the message fields (but not both).

status, response = client.channel(1).rpcs.MyServer.MyUnary(some_field=123)

# Calls with a server stream return a status and a list of responses.
status, responses = rpcs.MyService.MyServerStreaming(Request(some_field=123))

Synchronous client and bidirectional streaming calls accept an iterable of requests to send.

requests = [Request(a=1), Request(b=2)]
status, response = rpcs.MyService.MyClientStreaming(requests)

requests = [Request(a=1), Request(b=2)]
status, responses = rpcs.MyService.MyBidirectionalStreaming(requests)

Synchronous invocations block until the RPC completes or times out. The calls use the default timeout provided when the callback_client.Impl() is created, or a timeout passed in through the pw_rpc_timeout_s argument. A timeout of None means to wait indefinitely for a response.

Asynchronous invocations immediately return a call object. Callbacks may be provided for the three RPC events:

  • on_next(call_object, response) - called for each response

  • on_completed(call_object, status) - called when the RPC completes

  • on_error(call_object, error) - called if the RPC terminates due to an error

The default callbacks simply log the events. If a user-provided callback throws an exception, that exception is logged and raised when the user calls functions on the call object.

Unary and client streaming RPCs are invoked asynchronously by calling invoke on the method object. invoke takes the callbacks. The request may be provided either as a constructed protobuf or as a dict of proto fields in the request_args parameter.

# Pass the request as a protobuf and provide callbacks for all RPC events.
rpc = client.channel(1).call.MyService.MyServerStreaming
call = rpc.invoke(rpc.request(a=1), on_next_cb, on_completed_cb, on_error_cb)

# Create the request from the provided keyword args. Provide a callback for
# responses, but simply log the other RPC events.
call = client.channel(1).call.MyServer.MyUnary.invoke(
    request_args=dict(a=1, b=2), on_next=lambda _, reply: process_this(reply))

For client and bidirectional streaming RPCs, requests are sent with the send method. The finish_and_wait method finishes the client stream. It optionally takes an iterable for responses to send before closing the stream.

# Start the call using callbacks.
call = client.channel(1).rpcs.MyServer.MyClientStream.invoke(on_error=err_cb)

# Send a single client stream request.
call.send(some_field=123)

# Send the requests, close the stream, then wait for the RPC to complete.
stream_responses = call.finish_and_wait([RequestType(some_field=123), ...])
class pw_rpc.callback_client.BidirectionalStreamingCall(
rpcs: PendingRpcs,
rpc: PendingRpc,
default_timeout_s: float | None,
on_next: Callable[[CallTypeT, Any], Any] | None,
on_completed: Callable[[CallTypeT, Any], Any] | None,
on_error: Callable[[CallTypeT, Any], Any] | None,
max_responses: int,
)#

Call variant that tracks a bidirectional streaming RPC call.

finish_and_wait(
requests: Iterable[Message] = (),
*,
timeout_s: UseDefault | float | None = UseDefault.VALUE,
) StreamResponse#

Ends the client stream and waits for the RPC to complete.

send(request_proto: Message | None = None, /, **request_fields) None#

Sends a message to the server in the client stream.

class pw_rpc.callback_client.Call(
rpcs: PendingRpcs,
rpc: PendingRpc,
default_timeout_s: float | None,
on_next: Callable[[CallTypeT, Any], Any] | None,
on_completed: Callable[[CallTypeT, Any], Any] | None,
on_error: Callable[[CallTypeT, Any], Any] | None,
max_responses: int,
)#

Represents an in-progress or completed RPC call.

__init__(
rpcs: PendingRpcs,
rpc: PendingRpc,
default_timeout_s: float | None,
on_next: Callable[[CallTypeT, Any], Any] | None,
on_completed: Callable[[CallTypeT, Any], Any] | None,
on_error: Callable[[CallTypeT, Any], Any] | None,
max_responses: int,
) None#
cancel() bool#

Cancels the RPC; returns whether the RPC was active.

completed() bool#

True if the RPC call has completed, successfully or from an error.

class pw_rpc.callback_client.ClientStreamingCall(
rpcs: PendingRpcs,
rpc: PendingRpc,
default_timeout_s: float | None,
on_next: Callable[[CallTypeT, Any], Any] | None,
on_completed: Callable[[CallTypeT, Any], Any] | None,
on_error: Callable[[CallTypeT, Any], Any] | None,
max_responses: int,
)#

Call variant that tracks a client streaming RPC call.

finish_and_wait(
requests: Iterable[Message] = (),
*,
timeout_s: UseDefault | float | None = UseDefault.VALUE,
) UnaryResponse#

Ends the client stream and waits for the RPC to complete.

send(request_proto: Message | None = None, /, **request_fields) None#

Sends client stream request to the server.

class pw_rpc.callback_client.ServerStreamingCall(
rpcs: PendingRpcs,
rpc: PendingRpc,
default_timeout_s: float | None,
on_next: Callable[[CallTypeT, Any], Any] | None,
on_completed: Callable[[CallTypeT, Any], Any] | None,
on_error: Callable[[CallTypeT, Any], Any] | None,
max_responses: int,
)#

Call variant that tracks a server streaming RPC call.

request_completion() None#

Sends client completion packet to server.

class pw_rpc.callback_client.StreamResponse(status: Status, responses: Sequence[Any])#

Results from a server or bidirectional streaming RPC.

responses: Sequence[Any]#

Alias for field number 1

status: Status#

Alias for field number 0

class pw_rpc.callback_client.UnaryCall(
rpcs: PendingRpcs,
rpc: PendingRpc,
default_timeout_s: float | None,
on_next: Callable[[CallTypeT, Any], Any] | None,
on_completed: Callable[[CallTypeT, Any], Any] | None,
on_error: Callable[[CallTypeT, Any], Any] | None,
max_responses: int,
)#

Call variant that tracks the a unary RPC call.

class pw_rpc.callback_client.UnaryResponse(status: Status, response: Any)#

Result from a unary or client streaming RPC: status and response.

response: Any#

Alias for field number 1

status: Status#

Alias for field number 0

unwrap_or_raise()#

Returns the response value or raises ValueError if not OK.

pw_rpc.descriptors#

Types representing the basic pw_rpc concepts: channel, service, method.

class pw_rpc.descriptors.Channel(id: 'int', output: 'Callable[[bytes], Any]')#
__init__(id: int, output: Callable[[bytes], Any]) None#
class pw_rpc.descriptors.ChannelManipulator#

A a pipe interface that may manipulate packets before they’re sent.

ChannelManipulator``s allow application-specific packet handling to be injected into the packet processing pipeline for an ingress or egress channel-like pathway. This is particularly useful for integration testing resilience to things like packet loss on a usually-reliable transport. RPC server integrations (e.g. ``HdlcRpcLocalServerAndClient) may provide an opportunity to inject a ChannelManipulator for this use case.

A ChannelManipulator should not modify send_packet, as the consumer of a ChannelManipulator will use send_packet to insert the provided ChannelManipulator into a packet processing path.

For example:

class PacketLogger(ChannelManipulator):
    def process_and_send(self, packet: bytes) -> None:
        _LOG.debug('Received packet with payload: %s', str(packet))
        self.send_packet(packet)


packet_logger = PacketLogger()

# Configure actual send command.
packet_logger.send_packet = socket.sendall

# Route the output channel through the PacketLogger channel manipulator.
channels = tuple(Channel(_DEFAULT_CHANNEL, packet_logger))

# Create a RPC client.
reader = SocketReader(socket)
with reader:
    client = HdlcRpcClient(reader, protos, channels, stdout)
    with client:
      # Do something with client
__init__() None#
abstract process_and_send(packet: bytes) None#

Processes an incoming packet before optionally sending it.

Implementations of this method may send the processed packet, multiple packets, or no packets at all via the registered send_packet() handler.

pw_rpc.console_tools#

Utilities for building tools that interact with pw_rpc.

class pw_rpc.console_tools.ClientInfo(name: str, client: object, rpc_client: pw_rpc.Client)#

Information about an RPC client as it appears in the console.

client: object#

Alias for field number 1

name: str#

Alias for field number 0

rpc_client: Client#

Alias for field number 2

class pw_rpc.console_tools.Context(
client_info: Collection[ClientInfo],
default_client: Any,
protos: Library,
*,
help_header: str = '',
)#

The Context class is used to set up an interactive RPC console.

The Context manages a set of variables that make it easy to access RPCs and protobufs in a REPL.

As an example, this class can be used to set up a console with IPython:

context = console_tools.Context(
    clients, default_client, protos, help_header=WELCOME_MESSAGE)
IPython.start_ipython(argv=[], user_ns=dict(**context.variables()))

Or with ptpython:

from ptpython.repl import embed

context = console_tools.Context(
    clients, default_client, protos, help_header=WELCOME_MESSAGE)
embed(globals(), dict(**context.variables()))
__init__(
client_info: Collection[ClientInfo],
default_client: Any,
protos: Library,
*,
help_header: str = '',
) None#

Creates an RPC console context.

Protos and RPC services are accessible by their proto package and name. The target for these can be set with the set_target function.

Parameters:
  • client_info – ClientInfo objects that represent the clients this console uses to communicate with other devices

  • default_client – default client object; must be one of the clients

  • protos – protobufs to use for RPCs for all clients

  • help_header – Message to display for the help command

flattened_rpc_completions()#

Create a flattened list of rpc commands for repl auto-completion.

set_target(selected_client: object, channel_id: int | None = None) None#

Sets the default target for commands.

variables() dict[str, Any]#

Returns a mapping of names to variables for use in an RPC console.

class pw_rpc.console_tools.Watchdog(on_reset: ~typing.Callable[[], ~typing.Any], on_expiration: ~typing.Callable[[], ~typing.Any], while_expired: ~typing.Callable[[], ~typing.Any] = <function Watchdog.<lambda>>, timeout_s: float = 1, expired_timeout_s: float | None = None)#

Simple class that times out unless reset.

This class could be used, for example, to track a device’s connection state for devices that send a periodic heartbeat packet.

__init__(on_reset: ~typing.Callable[[], ~typing.Any], on_expiration: ~typing.Callable[[], ~typing.Any], while_expired: ~typing.Callable[[], ~typing.Any] = <function Watchdog.<lambda>>, timeout_s: float = 1, expired_timeout_s: float | None = None)#

Creates a watchdog; start() must be called to start it.

Parameters:
  • on_reset – Function called when the watchdog is reset after having expired.

  • on_expiration – Function called when the timeout expires.

  • while_expired – Function called repeatedly while the watchdog is expired.

  • timeout_s – If reset() is not called for timeout_s, the watchdog expires and calls the on_expiration callback.

  • expired_timeout_s – While expired, the watchdog calls the while_expired callback every expired_timeout_s.

reset() bool#

Resets the timeout; calls the on_reset callback if expired.

Returns True if was expired.

start() None#

Starts the watchdog; must be called for the watchdog to work.

stop() None#

Stops the watchdog.

This will not trigger the execution of any callbacks and will prevent further execution of any callbacks (including while_expired) until start is called again.

pw_rpc.console_tools.alias_deprecated_command(variables: Any, old_name: str, new_name: str) None#

Adds an alias for an old command that redirects to the new command.

The deprecated command prints a message then invokes the new command.

pw_rpc.console_tools.flattened_rpc_completions(client_info_list: Collection[ClientInfo]) dict[str, str]#

Create a flattened list of rpc commands for repl auto-completion.

This gathers all rpc commands from a set of ClientInfo variables and produces a flattened list of valid rpc commands to run in an RPC console. This is useful for passing into prompt_toolkit.completion.WordCompleter.

Parameters:

client_info_list – List of ClientInfo variables

Returns:

Dict of flattened rpc commands as keys, and ‘RPC’ as values. For example:

{
    'device.rpcs.pw.rpc.EchoService.Echo': 'RPC,
    'device.rpcs.pw.rpc.BatteryService.GetBatteryStatus': 'RPC',
}

pw_rpc.console_tools.help_as_repr(function: Callable) Callable#

Wraps a function so that its repr() and docstring provide detailed help.

This is useful for creating commands in an interactive console. In a console, typing a function’s name and hitting Enter shows rich documentation with the full function signature, type annotations, and docstring when the function is wrapped with help_as_repr.

pw_rpc.benchmark#

Pigweed RPC Benchmark tools.

The benchmark folder contains a few utilities:

  1. A host-side client, benchmark.py, containing functions for basic RPC statistics gathering and measurement.

  2. An output definition file, benchmark_results.py, used to record tests results and enable extending test data.

The host-side client is intended to be included in a user’s custom project, while the default runner, implemented in pw_system, can be invoked as a python module in a terminal.

The default runner has been implemented in the pw_system module. It can be used as a metric of its own or as a starting implementation for users to make their own benchmarking runner.

See the pw_system folder for details on the default runner.

class pw_rpc.benchmark.BaseResult#

Provides a base class for easy formatting of different test results.

The result class is intended to represent the RPC response of a single RPC and provide any relevant calculations/helper functions for that data. Users can inherit this class into their own, customized result classes when creating custom tests.

class pw_rpc.benchmark.Benchmark(
rpcs: Any,
options: BenchmarkOptions = BenchmarkOptions(max_payload_size=64, max_sample_set_size=1000, quantile_divisions=100, use_echo_service=False),
)#

Provides host-side benchmarking capabilities for PW devices

__init__(
rpcs: Any,
options: BenchmarkOptions = BenchmarkOptions(max_payload_size=64, max_sample_set_size=1000, quantile_divisions=100, use_echo_service=False),
) None#

Creates a benchmarking object for the provided RPCs.

Parameters:
  • rpcs – The pigweed RPCs to test from the target Device.

  • options – Test and device-specific options to set, Defaults to a device-specific default if None.

Returns:

Throws exceptions on error.

Return type:

None

measure_rpc_goodput(size: int | None = None) GoodputStatisticsResult#

Performs a ‘goodput’ test on the DUT with a variable size data packet

This test performs a single RPC call and measures its response. Use the measure_rpc_goodput_statistics function for larger sample sizes.

Parameters:

size – The size, in bytes, of the payload. Defaults to max_payload_size if invalid.

Returns:

The test statistics collected.

Return type:

GoodputStatisticsResult

measure_rpc_goodput_statistics(
size: int | None = None,
sample_count: int | None = None,
) GoodputStatisticsResult#
Performs a series of ‘goodput’ calls on the DUT with a variable

size data packet.

This test performs a user-specified number of RPC call and measures the aggregated statistics for the responses.

Parameters:
  • size – The size, in bytes, of the payload. Defaults to max_payload_size.

  • sample_count – The number of samples to collect for the statistics. Defaults to max_sample_set_size.

Returns:

The test statistics collected.

Return type:

GoodputStatisticsResult

class pw_rpc.benchmark.DataStatistics(*, datapoints: list[float] = <factory>)#

Dynamically calculates various statistics on a stored data set.

This class is primarily meant to aggregate data samples from RPC tests and provide calculated statistics when its accessor functions are invoked. For example, a test could record all the RPC roundtrip times and then call average to return the average roundtrip time.

This class is usually instantiated (automatically) in a parent object that represents the test output and parameters for a given test.

Use the store function to add data to the dataset.

__init__(*, datapoints: list[float] = <factory>) None#
class pw_rpc.benchmark.GoodputStatisticsResult(
*,
goodput_in_bytes_per_sec: float = 0.0,
packet_size: int = 1,
quantile_divisions: int = 100,
data_statistics: ~pw_rpc.benchmark.benchmark_results.DataStatistics = <factory>,
statuses: ~collections.Counter[~pw_status.Status] = <factory>,
)#

Encapsulates the data of the measure_rpc_goodput_statistics function.

This class provides helper and accessor functions to calculate the results of a goodput test in the benchmark.py file. Use the update function to add new data to an instantiated object, and print_results to show the calculated test metrics (typically only done at the end of a test).

This class can be used for tests with sample size >= 1.

__init__(
*,
goodput_in_bytes_per_sec: float = 0.0,
packet_size: int = 1,
quantile_divisions: int = 100,
data_statistics: ~pw_rpc.benchmark.benchmark_results.DataStatistics = <factory>,
statuses: ~collections.Counter[~pw_status.Status] = <factory>,
) None#
print_results(file: TextIO | None = None) None#

Calculates and displays the statistics for a given sample set.

Parameters:

file – An optional file to redirect the print statements to. Defaults to the temrinal.

Returns:

Raises an exception on an empty data set.

Return type:

None

results() Iterable[str]#

Returns a printable iterable of test results.

Parameters:

None.

Returns:

Raises an exception on an empty data set.

Return type:

None

update(time: float, status: Status)#

Stores calculated data for a given test and updates specific metrics.

Parameters:
  • time – The RPC goodput time for a given sample.

  • status – The RPC result status code for a given sample.

Returns:

None.