pw_rpc Python package

The pw_rpc Python package makes it possible to call Pigweed RPCs from Python. The package includes a pw_rpc client library, as well as tools for creating a pw_rpc console.

pw_rpc.client

Provides a pw_rpc client for Python.

class pw_rpc.client.Client(impl: pw_rpc.client.ClientImpl, channels: Iterable[pw_rpc.descriptors.Channel], services: Iterable[pw_rpc.descriptors.Service])

Sends requests and handles responses for a set of channels.

RPC invocations occur through a ChannelClient.

__init__(impl: pw_rpc.client.ClientImpl, channels: Iterable[pw_rpc.descriptors.Channel], services: Iterable[pw_rpc.descriptors.Service])
channel(channel_id: Optional[int] = None) pw_rpc.client.ChannelClient

Returns a ChannelClient, which is used to call RPCs on a channel.

If no channel is provided, the first channel is used.

channels() Iterable[pw_rpc.client.ChannelClient]

Accesses the ChannelClients in this client.

method(method_name: str) pw_rpc.descriptors.Method

Returns a Method matching the given name.

Parameters

method_name – name as package.Service/Method or package.Service.Method.

Raises
  • ValueError – the method name is not properly formatted

  • KeyError – the method is not present

methods() Iterator[pw_rpc.descriptors.Method]

Iterates over all Methods supported by this client.

process_packet(pw_rpc_raw_packet_data: bytes, *impl_args, **impl_kwargs) pw_status.Status

Processes an incoming packet.

Parameters
  • pw_rpc_raw_packet_data – raw binary data for exactly one RPC packet

  • impl_args – optional positional arguments passed to the ClientImpl

  • impl_kwargs – optional keyword arguments passed to the ClientImpl

Returns

OK - the packet was processed by this client DATA_LOSS - the packet could not be decoded INVALID_ARGUMENT - the packet is for a server, not a client NOT_FOUND - the packet’s channel ID is not known to this client

class pw_rpc.client.ClientImpl

The internal interface of the RPC client.

This interface defines the semantics for invoking an RPC on a particular client.

__init__()
abstract handle_completion(rpc: pw_rpc.client.PendingRpc, context: Any, status: pw_status.Status, *, args: tuple = (), kwargs: Optional[dict] = None) Any

Handles the successful completion of an RPC.

Parameters
  • rpc – Information about the pending RPC

  • context – Arbitrary context object associated with the pending RPC

  • status – Status returned from the RPC

  • args – Arbitrary arguments passed to the ClientImpl

  • kwargs – Arbitrary arguments passed to the ClientImpl

abstract handle_error(rpc: pw_rpc.client.PendingRpc, context, status: pw_status.Status, *, args: tuple = (), kwargs: Optional[dict] = None)

Handles the abnormal termination of an RPC.

Parameters
  • rpc – Information about the pending RPC

  • context – Arbitrary context object associated with the pending RPC

  • status – which error occurred

  • args – Arbitrary arguments passed to the ClientImpl

  • kwargs – Arbitrary arguments passed to the ClientImpl

abstract handle_response(rpc: pw_rpc.client.PendingRpc, context: Any, payload: Any, *, args: tuple = (), kwargs: Optional[dict] = None) Any

Handles a response from the RPC server.

Parameters
  • rpc – Information about the pending RPC

  • context – Arbitrary context object associated with the pending RPC

  • payload – A protobuf message

  • args – Arbitrary arguments passed to the ClientImpl

  • kwargs – Arbitrary arguments passed to the ClientImpl

abstract method_client(channel: pw_rpc.descriptors.Channel, method: pw_rpc.descriptors.Method) Any

Returns an object that invokes a method using the given channel.

pw_rpc.callback_client

Defines a callback-based RPC ClientImpl to use with pw_rpc.Client.

callback_client.Impl supports invoking RPCs synchronously or asynchronously. Asynchronous invocations use a callback.

Synchronous invocations look like a function call. When invoking a unary or server streaming RPC, the request may be provided as a message object or as keyword arguments for the message fields (but not both).

status, response = client.channel(1).rpcs.MyServer.MyUnary(some_field=123)

# Calls with a server stream return a status and a list of responses.
status, responses = rpcs.MyService.MyServerStreaming(Request(some_field=123))

Synchronous client and bidirectional streaming calls accept an iterable of requests to send.

requests = [Request(a=1), Request(b=2)]
status, response = rpcs.MyService.MyClientStreaming(requests)

requests = [Request(a=1), Request(b=2)]
status, responses = rpcs.MyService.MyBidirectionalStreaming(requests)

Synchronous invocations block until the RPC completes or times out. The calls use the default timeout provided when the callback_client.Impl() is created, or a timeout passed in through the pw_rpc_timeout_s argument. A timeout of None means to wait indefinitely for a response.

Asynchronous invocations immediately return a call object. Callbacks may be provided for the three RPC events:

  • on_next(call_object, response) - called for each response

  • on_completed(call_object, status) - called when the RPC completes

  • on_error(call_object, error) - called if the RPC terminates due to an error

The default callbacks simply log the events. If a user-provided callback throws an exception, that exception is logged and raised when the user calls functions on the call object.

Unary and client streaming RPCs are invoked asynchronously by calling invoke on the method object. invoke takes the callbacks. The request may be provided either as a constructed protobuf or as a dict of proto fields in the request_args parameter.

# Pass the request as a protobuf and provide callbacks for all RPC events.
rpc = client.channel(1).call.MyService.MyServerStreaming
call = rpc.invoke(rpc.request(a=1), on_next_cb, on_completed_cb, on_error_cb)

# Create the request from the provided keyword args. Provide a callback for
# responses, but simply log the other RPC events.
call = client.channel(1).call.MyServer.MyUnary.invoke(
    request_args=dict(a=1, b=2), on_next=lambda _, reply: process_this(reply))

For client and bidirectional streaming RPCs, requests are sent with the send method. The finish_and_wait method finishes the client stream. It optionally takes an iterable for responses to send before closing the stream.

# Start the call using callbacks.
call = client.channel(1).rpcs.MyServer.MyClientStream.invoke(on_error=err_cb)

# Send a single client stream request.
call.send(some_field=123)

# Send the requests, close the stream, then wait for the RPC to complete.
stream_responses = call.finish_and_wait([RequestType(some_field=123), ...])
class pw_rpc.callback_client.BidirectionalStreamingCall(rpcs: pw_rpc.client.PendingRpcs, rpc: pw_rpc.client.PendingRpc, default_timeout_s: Optional[float], on_next: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]], on_completed: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]], on_error: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]])

Tracks the state of a bidirectional streaming RPC call.

finish_and_wait(requests: Iterable[google.protobuf.message.Message] = (), *, timeout_s: Optional[Union[pw_rpc.callback_client.call.UseDefault, float]] = UseDefault.VALUE) pw_rpc.callback_client.call.StreamResponse

Ends the client stream and waits for the RPC to complete.

send(_rpc_request_proto: Optional[google.protobuf.message.Message] = None, **request_fields) None

Sends a message to the server in the client stream.

class pw_rpc.callback_client.ClientStreamingCall(rpcs: pw_rpc.client.PendingRpcs, rpc: pw_rpc.client.PendingRpc, default_timeout_s: Optional[float], on_next: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]], on_completed: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]], on_error: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]])

Tracks the state of a client streaming RPC call.

finish_and_wait(requests: Iterable[google.protobuf.message.Message] = (), *, timeout_s: Optional[Union[pw_rpc.callback_client.call.UseDefault, float]] = UseDefault.VALUE) pw_rpc.callback_client.call.UnaryResponse

Ends the client stream and waits for the RPC to complete.

send(_rpc_request_proto: Optional[google.protobuf.message.Message] = None, **request_fields) None

Sends client stream request to the server.

class pw_rpc.callback_client.ServerStreamingCall(rpcs: pw_rpc.client.PendingRpcs, rpc: pw_rpc.client.PendingRpc, default_timeout_s: Optional[float], on_next: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]], on_completed: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]], on_error: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]])

Tracks the state of a server streaming RPC call.

class pw_rpc.callback_client.StreamResponse(status: pw_status.Status, responses: Sequence[Any])

Results from a server or bidirectional streaming RPC.

responses: Sequence[Any]

Alias for field number 1

status: pw_status.Status

Alias for field number 0

class pw_rpc.callback_client.UnaryCall(rpcs: pw_rpc.client.PendingRpcs, rpc: pw_rpc.client.PendingRpc, default_timeout_s: Optional[float], on_next: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]], on_completed: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]], on_error: Optional[Callable[[pw_rpc.callback_client.call.CallType, Any], Any]])

Tracks the state of a unary RPC call.

class pw_rpc.callback_client.UnaryResponse(status: pw_status.Status, response: Any)

Result from a unary or client streaming RPC: status and response.

response: Any

Alias for field number 1

status: pw_status.Status

Alias for field number 0

pw_rpc.console_tools

Utilities for building tools that interact with pw_rpc.

class pw_rpc.console_tools.ClientInfo(name: str, client: object, rpc_client: pw_rpc.client.Client)

Information about an RPC client as it appears in the console.

client: object

Alias for field number 1

name: str

Alias for field number 0

rpc_client: pw_rpc.client.Client

Alias for field number 2

class pw_rpc.console_tools.Context(client_info: Collection[pw_rpc.console_tools.console.ClientInfo], default_client: Any, protos: pw_protobuf_compiler.python_protos.Library, *, help_header: str = '')

The Context class is used to set up an interactive RPC console.

The Context manages a set of variables that make it easy to access RPCs and protobufs in a REPL.

As an example, this class can be used to set up a console with IPython:

context = console_tools.Context(
    clients, default_client, protos, help_header=WELCOME_MESSAGE)
IPython.terminal.embed.InteractiveShellEmbed().mainloop(
    module=types.SimpleNamespace(**context.variables()))
__init__(client_info: Collection[pw_rpc.console_tools.console.ClientInfo], default_client: Any, protos: pw_protobuf_compiler.python_protos.Library, *, help_header: str = '') None

Creates an RPC console context.

Protos and RPC services are accessible by their proto package and name. The target for these can be set with the set_target function.

Parameters
  • client_info – ClientInfo objects that represent the clients this console uses to communicate with other devices

  • default_client – default client object; must be one of the clients

  • protos – protobufs to use for RPCs for all clients

  • help_header – Message to display for the help command

flattened_rpc_completions()

Create a flattened list of rpc commands for repl auto-completion.

set_target(selected_client: object, channel_id: Optional[int] = None) None

Sets the default target for commands.

variables() Dict[str, Any]

Returns a mapping of names to variables for use in an RPC console.

class pw_rpc.console_tools.Watchdog(on_reset: Callable[[], Any], on_expiration: Callable[[], Any], while_expired: Callable[[], Any] = <function Watchdog.<lambda>>, timeout_s: float = 1, expired_timeout_s: Optional[float] = None)

Simple class that times out unless reset.

This class could be used, for example, to track a device’s connection state for devices that send a periodic heartbeat packet.

__init__(on_reset: Callable[[], Any], on_expiration: Callable[[], Any], while_expired: Callable[[], Any] = <function Watchdog.<lambda>>, timeout_s: float = 1, expired_timeout_s: Optional[float] = None)

Creates a watchdog; start() must be called to start it.

Parameters
  • on_reset – Function called when the watchdog is reset after having expired.

  • on_expiration – Function called when the timeout expires.

  • while_expired – Function called repeatedly while the watchdog is expired.

  • timeout_s – If reset() is not called for timeout_s, the watchdog expires and calls the on_expiration callback.

  • expired_timeout_s – While expired, the watchdog calls the while_expired callback every expired_timeout_s.

reset() None

Resets the timeout; calls the on_reset callback if expired.

start() None

Starts the watchdog; must be called for the watchdog to work.

pw_rpc.console_tools.alias_deprecated_command(variables: Any, old_name: str, new_name: str) None

Adds an alias for an old command that redirects to the new command.

The deprecated command prints a message then invokes the new command.

pw_rpc.console_tools.flattened_rpc_completions(client_info_list: Collection[pw_rpc.console_tools.console.ClientInfo]) Dict[str, str]

Create a flattened list of rpc commands for repl auto-completion.

This gathers all rpc commands from a set of ClientInfo variables and produces a flattened list of valid rpc commands to run in an RPC console. This is useful for passing into prompt_toolkit.completion.WordCompleter.

Parameters

client_info_list – List of ClientInfo variables

Returns

Dict of flattened rpc commands as keys, and ‘RPC’ as values. For example:

{
    'device.rpcs.pw.rpc.EchoService.Echo': 'RPC,
    'device.rpcs.pw.rpc.BatteryService.GetBatteryStatus': 'RPC',
}

pw_rpc.console_tools.help_as_repr(function: Callable) Callable

Wraps a function so that its repr() and docstring provide detailed help.

This is useful for creating commands in an interactive console. In a console, typing a function’s name and hitting Enter shows rich documentation with the full function signature, type annotations, and docstring when the function is wrapped with help_as_repr.