pw_rpc Python package#

The pw_rpc Python package makes it possible to call Pigweed RPCs from Python. The package includes a pw_rpc client library, as well as tools for creating a pw_rpc console.

pw_rpc.client#

Provides a pw_rpc client for Python.

class pw_rpc.client.Client(
impl: ClientImpl,
channels: Iterable[Channel],
services: Iterable[Service],
)#

Sends requests and handles responses for a set of channels.

RPC invocations occur through a ChannelClient.

Users may set an optional response_callback that is called before processing every response or server stream RPC packet.

__init__(
impl: ClientImpl,
channels: Iterable[Channel],
services: Iterable[Service],
)#
channel(channel_id: int | None = None) ChannelClient#

Returns a ChannelClient, which is used to call RPCs on a channel.

If no channel is provided, the first channel is used.

channels() Iterable[ChannelClient]#

Accesses the ChannelClients in this client.

method(method_name: str) Method#

Returns a Method matching the given name.

Parameters:

method_name – name as package.Service/Method or package.Service.Method.

Raises:
  • ValueError – the method name is not properly formatted

  • KeyError – the method is not present

methods() Iterator[Method]#

Iterates over all Methods supported by this client.

process_packet(pw_rpc_raw_packet_data: bytes, *impl_args, **impl_kwargs) Status#

Processes an incoming packet.

Parameters:
  • pw_rpc_raw_packet_data – raw binary data for exactly one RPC packet

  • impl_args – optional positional arguments passed to the ClientImpl

  • impl_kwargs – optional keyword arguments passed to the ClientImpl

Returns:

OK - the packet was processed by this client DATA_LOSS - the packet could not be decoded INVALID_ARGUMENT - the packet is for a server, not a client NOT_FOUND - the packet’s channel ID is not known to this client

class pw_rpc.client.ClientImpl#

The internal interface of the RPC client.

This interface defines the semantics for invoking an RPC on a particular client.

__init__() None#
abstract handle_completion(
rpc: PendingRpc,
context: Any,
status: Status,
*,
args: tuple = (),
kwargs: dict | None = None,
) Any#

Handles the successful completion of an RPC.

Parameters:
  • rpc – Information about the pending RPC

  • context – Arbitrary context object associated with the pending RPC

  • status – Status returned from the RPC

  • args – Arbitrary arguments passed to the ClientImpl

  • kwargs – Arbitrary arguments passed to the ClientImpl

abstract handle_error(
rpc: PendingRpc,
context,
status: Status,
*,
args: tuple = (),
kwargs: dict | None = None,
)#

Handles the abnormal termination of an RPC.

Parameters:
  • rpc – Information about the pending RPC

  • context – Arbitrary context object associated with the pending RPC

  • status – which error occurred

  • args – Arbitrary arguments passed to the ClientImpl

  • kwargs – Arbitrary arguments passed to the ClientImpl

abstract handle_response(
rpc: PendingRpc,
context: Any,
payload: Any,
*,
args: tuple = (),
kwargs: dict | None = None,
) Any#

Handles a response from the RPC server.

Parameters:
  • rpc – Information about the pending RPC

  • context – Arbitrary context object associated with the pending RPC

  • payload – A protobuf message

  • args – Arbitrary arguments passed to the ClientImpl

  • kwargs – Arbitrary arguments passed to the ClientImpl

abstract method_client(channel: Channel, method: Method) Any#

Returns an object that invokes a method using the given channel.

pw_rpc.callback_client#

Defines a callback-based RPC ClientImpl to use with pw_rpc.Client.

callback_client.Impl supports invoking RPCs synchronously or asynchronously. Asynchronous invocations use a callback.

Synchronous invocations look like a function call. When invoking a unary or server streaming RPC, the request may be provided as a message object or as keyword arguments for the message fields (but not both).

status, response = client.channel(1).rpcs.MyServer.MyUnary(some_field=123)

# Calls with a server stream return a status and a list of responses.
status, responses = rpcs.MyService.MyServerStreaming(Request(some_field=123))

Synchronous client and bidirectional streaming calls accept an iterable of requests to send.

requests = [Request(a=1), Request(b=2)]
status, response = rpcs.MyService.MyClientStreaming(requests)

requests = [Request(a=1), Request(b=2)]
status, responses = rpcs.MyService.MyBidirectionalStreaming(requests)

Synchronous invocations block until the RPC completes or times out. The calls use the default timeout provided when the callback_client.Impl() is created, or a timeout passed in through the pw_rpc_timeout_s argument. A timeout of None means to wait indefinitely for a response.

Asynchronous invocations immediately return a call object. Callbacks may be provided for the three RPC events:

  • on_next(call_object, response) - called for each response

  • on_completed(call_object, status) - called when the RPC completes

  • on_error(call_object, error) - called if the RPC terminates due to an error

The default callbacks simply log the events. If a user-provided callback throws an exception, that exception is logged and raised when the user calls functions on the call object.

Unary and client streaming RPCs are invoked asynchronously by calling invoke on the method object. invoke takes the callbacks. The request may be provided either as a constructed protobuf or as a dict of proto fields in the request_args parameter.

# Pass the request as a protobuf and provide callbacks for all RPC events.
rpc = client.channel(1).call.MyService.MyServerStreaming
call = rpc.invoke(rpc.request(a=1), on_next_cb, on_completed_cb, on_error_cb)

# Create the request from the provided keyword args. Provide a callback for
# responses, but simply log the other RPC events.
call = client.channel(1).call.MyServer.MyUnary.invoke(
    request_args=dict(a=1, b=2), on_next=lambda _, reply: process_this(reply))

For client and bidirectional streaming RPCs, requests are sent with the send method. The finish_and_wait method finishes the client stream. It optionally takes an iterable for responses to send before closing the stream.

# Start the call using callbacks.
call = client.channel(1).rpcs.MyServer.MyClientStream.invoke(on_error=err_cb)

# Send a single client stream request.
call.send(some_field=123)

# Send the requests, close the stream, then wait for the RPC to complete.
stream_responses = call.finish_and_wait([RequestType(some_field=123), ...])
class pw_rpc.callback_client.BidirectionalStreamingCall(
rpcs: PendingRpcs,
rpc: PendingRpc,
default_timeout_s: float | None,
on_next: Callable[[CallTypeT, Any], Any] | None,
on_completed: Callable[[CallTypeT, Any], Any] | None,
on_error: Callable[[CallTypeT, Any], Any] | None,
)#

Tracks the state of a bidirectional streaming RPC call.

finish_and_wait(
requests: Iterable[Message] = (),
*,
timeout_s: UseDefault | float | None = UseDefault.VALUE,
) StreamResponse#

Ends the client stream and waits for the RPC to complete.

send(_rpc_request_proto: Message | None = None, **request_fields) None#

Sends a message to the server in the client stream.

class pw_rpc.callback_client.ClientStreamingCall(
rpcs: PendingRpcs,
rpc: PendingRpc,
default_timeout_s: float | None,
on_next: Callable[[CallTypeT, Any], Any] | None,
on_completed: Callable[[CallTypeT, Any], Any] | None,
on_error: Callable[[CallTypeT, Any], Any] | None,
)#

Tracks the state of a client streaming RPC call.

finish_and_wait(
requests: Iterable[Message] = (),
*,
timeout_s: UseDefault | float | None = UseDefault.VALUE,
) UnaryResponse#

Ends the client stream and waits for the RPC to complete.

send(_rpc_request_proto: Message | None = None, **request_fields) None#

Sends client stream request to the server.

class pw_rpc.callback_client.ServerStreamingCall(
rpcs: PendingRpcs,
rpc: PendingRpc,
default_timeout_s: float | None,
on_next: Callable[[CallTypeT, Any], Any] | None,
on_completed: Callable[[CallTypeT, Any], Any] | None,
on_error: Callable[[CallTypeT, Any], Any] | None,
)#

Tracks the state of a server streaming RPC call.

request_completion() None#

Sends client completion packet to server.

class pw_rpc.callback_client.StreamResponse(status: Status, responses: Sequence[Any])#

Results from a server or bidirectional streaming RPC.

responses: Sequence[Any]#

Alias for field number 1

status: Status#

Alias for field number 0

class pw_rpc.callback_client.UnaryCall(
rpcs: PendingRpcs,
rpc: PendingRpc,
default_timeout_s: float | None,
on_next: Callable[[CallTypeT, Any], Any] | None,
on_completed: Callable[[CallTypeT, Any], Any] | None,
on_error: Callable[[CallTypeT, Any], Any] | None,
)#

Tracks the state of a unary RPC call.

class pw_rpc.callback_client.UnaryResponse(status: Status, response: Any)#

Result from a unary or client streaming RPC: status and response.

response: Any#

Alias for field number 1

status: Status#

Alias for field number 0

pw_rpc.descriptors#

Types representing the basic pw_rpc concepts: channel, service, method.

class pw_rpc.descriptors.Channel(id: 'int', output: 'Callable[[bytes], Any] | None')#
__init__(id: int, output: Callable[[bytes], Any] | None) None#
class pw_rpc.descriptors.ChannelManipulator#

A a pipe interface that may manipulate packets before they’re sent.

ChannelManipulator``s allow application-specific packet handling to be injected into the packet processing pipeline for an ingress or egress channel-like pathway. This is particularly useful for integration testing resilience to things like packet loss on a usually-reliable transport. RPC server integrations (e.g. ``HdlcRpcLocalServerAndClient) may provide an opportunity to inject a ChannelManipulator for this use case.

A ChannelManipulator should not modify send_packet, as the consumer of a ChannelManipulator will use send_packet to insert the provided ChannelManipulator into a packet processing path.

For example:

class PacketLogger(ChannelManipulator):
    def process_and_send(self, packet: bytes) -> None:
        _LOG.debug('Received packet with payload: %s', str(packet))
        self.send_packet(packet)


packet_logger = PacketLogger()

# Configure actual send command.
packet_logger.send_packet = socket.sendall

# Route the output channel through the PacketLogger channel manipulator.
channels = tuple(Channel(_DEFAULT_CHANNEL, packet_logger))

# Create a RPC client.
reader = SocketReader(socket)
with reader:
    client = HdlcRpcClient(reader, protos, channels, stdout)
    with client:
      # Do something with client
__init__() None#
abstract process_and_send(packet: bytes) None#

Processes an incoming packet before optionally sending it.

Implementations of this method may send the processed packet, multiple packets, or no packets at all via the registered send_packet() handler.

pw_rpc.console_tools#

Utilities for building tools that interact with pw_rpc.

class pw_rpc.console_tools.ClientInfo(name: str, client: object, rpc_client: pw_rpc.Client)#

Information about an RPC client as it appears in the console.

client: object#

Alias for field number 1

name: str#

Alias for field number 0

rpc_client: Client#

Alias for field number 2

class pw_rpc.console_tools.Context(
client_info: Collection[ClientInfo],
default_client: Any,
protos: Library,
*,
help_header: str = '',
)#

The Context class is used to set up an interactive RPC console.

The Context manages a set of variables that make it easy to access RPCs and protobufs in a REPL.

As an example, this class can be used to set up a console with IPython:

context = console_tools.Context(
    clients, default_client, protos, help_header=WELCOME_MESSAGE)
IPython.start_ipython(argv=[], user_ns=dict(**context.variables()))
__init__(
client_info: Collection[ClientInfo],
default_client: Any,
protos: Library,
*,
help_header: str = '',
) None#

Creates an RPC console context.

Protos and RPC services are accessible by their proto package and name. The target for these can be set with the set_target function.

Parameters:
  • client_info – ClientInfo objects that represent the clients this console uses to communicate with other devices

  • default_client – default client object; must be one of the clients

  • protos – protobufs to use for RPCs for all clients

  • help_header – Message to display for the help command

flattened_rpc_completions()#

Create a flattened list of rpc commands for repl auto-completion.

set_target(selected_client: object, channel_id: int | None = None) None#

Sets the default target for commands.

variables() dict[str, Any]#

Returns a mapping of names to variables for use in an RPC console.

class pw_rpc.console_tools.Watchdog(on_reset: ~typing.Callable[[], ~typing.Any], on_expiration: ~typing.Callable[[], ~typing.Any], while_expired: ~typing.Callable[[], ~typing.Any] = <function Watchdog.<lambda>>, timeout_s: float = 1, expired_timeout_s: float | None = None)#

Simple class that times out unless reset.

This class could be used, for example, to track a device’s connection state for devices that send a periodic heartbeat packet.

__init__(on_reset: ~typing.Callable[[], ~typing.Any], on_expiration: ~typing.Callable[[], ~typing.Any], while_expired: ~typing.Callable[[], ~typing.Any] = <function Watchdog.<lambda>>, timeout_s: float = 1, expired_timeout_s: float | None = None)#

Creates a watchdog; start() must be called to start it.

Parameters:
  • on_reset – Function called when the watchdog is reset after having expired.

  • on_expiration – Function called when the timeout expires.

  • while_expired – Function called repeatedly while the watchdog is expired.

  • timeout_s – If reset() is not called for timeout_s, the watchdog expires and calls the on_expiration callback.

  • expired_timeout_s – While expired, the watchdog calls the while_expired callback every expired_timeout_s.

reset() bool#

Resets the timeout; calls the on_reset callback if expired.

Returns True if was expired.

start() None#

Starts the watchdog; must be called for the watchdog to work.

pw_rpc.console_tools.alias_deprecated_command(variables: Any, old_name: str, new_name: str) None#

Adds an alias for an old command that redirects to the new command.

The deprecated command prints a message then invokes the new command.

pw_rpc.console_tools.flattened_rpc_completions(client_info_list: Collection[ClientInfo]) dict[str, str]#

Create a flattened list of rpc commands for repl auto-completion.

This gathers all rpc commands from a set of ClientInfo variables and produces a flattened list of valid rpc commands to run in an RPC console. This is useful for passing into prompt_toolkit.completion.WordCompleter.

Parameters:

client_info_list – List of ClientInfo variables

Returns:

Dict of flattened rpc commands as keys, and ‘RPC’ as values. For example:

{
    'device.rpcs.pw.rpc.EchoService.Echo': 'RPC,
    'device.rpcs.pw.rpc.BatteryService.GetBatteryStatus': 'RPC',
}

pw_rpc.console_tools.help_as_repr(function: Callable) Callable#

Wraps a function so that its repr() and docstring provide detailed help.

This is useful for creating commands in an interactive console. In a console, typing a function’s name and hitting Enter shows rich documentation with the full function signature, type annotations, and docstring when the function is wrapped with help_as_repr.