Python client#
pw_rpc: Efficient, low-code-size RPC system for embedded devices
The pw_rpc
Python package makes it possible to call Pigweed RPCs from
Python. The package includes a pw_rpc
client library, as well as tools for
creating a pw_rpc
console.
pw_rpc.client#
Provides a pw_rpc client for Python.
- class pw_rpc.client.Client(
- impl: ClientImpl,
- channels: Iterable[Channel],
- services: Iterable[Service],
Sends requests and handles responses for a set of channels.
RPC invocations occur through a ChannelClient.
Users may set an optional response_callback that is called before processing every response or server stream RPC packet.
- __init__(
- impl: ClientImpl,
- channels: Iterable[Channel],
- services: Iterable[Service],
- channel(channel_id: int | None = None) ChannelClient #
Returns a ChannelClient, which is used to call RPCs on a channel.
If no channel is provided, the first channel is used.
- channels() Iterable[ChannelClient] #
Accesses the ChannelClients in this client.
- method(method_name: str) Method #
Returns a Method matching the given name.
- Parameters:
method_name – name as package.Service/Method or package.Service.Method.
- Raises:
ValueError – the method name is not properly formatted
KeyError – the method is not present
- methods() Iterator[Method] #
Iterates over all Methods supported by this client.
- process_packet(pw_rpc_raw_packet_data: bytes) Status #
Processes an incoming packet.
- Parameters:
pw_rpc_raw_packet_data – raw binary data for exactly one RPC packet
- Returns:
OK - the packet was processed by this client DATA_LOSS - the packet could not be decoded INVALID_ARGUMENT - the packet is for a server, not a client NOT_FOUND - the packet’s channel ID is not known to this client
- class pw_rpc.client.ClientImpl#
The internal interface of the RPC client.
This interface defines the semantics for invoking an RPC on a particular client.
- __init__() None #
- abstract handle_completion(rpc: PendingRpc, context: Any, status: Status) Any #
Handles the successful completion of an RPC.
- Parameters:
rpc – Information about the pending RPC
context – Arbitrary context object associated with the pending RPC
status – Status returned from the RPC
- abstract handle_error(rpc: PendingRpc, context, status: Status)#
Handles the abnormal termination of an RPC.
- Parameters:
rpc – Information about the pending RPC
context – Arbitrary context object associated with the pending RPC
status – which error occurred
- abstract handle_response(rpc: PendingRpc, context: Any, payload: Any) Any #
Handles a response from the RPC server.
- Parameters:
rpc – Information about the pending RPC
context – Arbitrary context object associated with the pending RPC
payload – A protobuf message
pw_rpc.client_utils#
Utilities for using pw_rpc.Client
.
- class pw_rpc.client_utils.NoEncodingSingleChannelRpcClient(
- reader: CancellableReader,
- paths_or_modules: Iterable[str | Path | ModuleType] | Library,
- channel: Channel,
- client_impl: ClientImpl | None = None,
An RPC client without any frame encoding with a single channel output.
The caveat is that the provided read function must read entire frames.
- __init__(
- reader: CancellableReader,
- paths_or_modules: Iterable[str | Path | ModuleType] | Library,
- channel: Channel,
- client_impl: ClientImpl | None = None,
Creates an RPC client over a single channel with no frame encoding.
- Parameters:
reader – Readable object used to receive RPC packets.
paths_or_modules – paths to .proto files or proto modules.
channel – RPC channel to use for output.
client_impl – The RPC Client implementation. Defaults to the callback client implementation if not provided.
- class pw_rpc.client_utils.RpcClient(
- reader_and_executor: DataReaderAndExecutor,
- paths_or_modules: Iterable[str | Path | ModuleType] | Library,
- channels: Iterable[Channel],
- client_impl: ClientImpl | None = None,
An RPC client with configurable incoming data processing.
- __init__(
- reader_and_executor: DataReaderAndExecutor,
- paths_or_modules: Iterable[str | Path | ModuleType] | Library,
- channels: Iterable[Channel],
- client_impl: ClientImpl | None = None,
Creates an RPC client.
- Parameters:
reader_and_executor –
DataReaderAndExecutor
instance.paths_or_modules – paths to .proto files or proto modules.
channels – RPC channels to use for output.
client_impl – The RPC client implementation. Defaults to the callback client implementation if not provided.
- rpcs(channel_id: int | None = None) Any #
Returns object for accessing services on the specified channel.
This skips some intermediate layers to make it simpler to invoke RPCs from an
HdlcRpcClient
. If only one channel is in use, the channel ID is not necessary.
pw_rpc.callback_client#
Defines a callback-based RPC ClientImpl to use with pw_rpc.Client.
callback_client.Impl supports invoking RPCs synchronously or asynchronously. Asynchronous invocations use a callback.
Synchronous invocations look like a function call. When invoking a unary or server streaming RPC, the request may be provided as a message object or as keyword arguments for the message fields (but not both).
status, response = client.channel(1).rpcs.MyServer.MyUnary(some_field=123)
# Calls with a server stream return a status and a list of responses.
status, responses = rpcs.MyService.MyServerStreaming(Request(some_field=123))
Synchronous client and bidirectional streaming calls accept an iterable of requests to send.
requests = [Request(a=1), Request(b=2)]
status, response = rpcs.MyService.MyClientStreaming(requests)
requests = [Request(a=1), Request(b=2)]
status, responses = rpcs.MyService.MyBidirectionalStreaming(requests)
Synchronous invocations block until the RPC completes or times out. The calls use the default timeout provided when the callback_client.Impl() is created, or a timeout passed in through the pw_rpc_timeout_s argument. A timeout of None means to wait indefinitely for a response.
Asynchronous invocations immediately return a call object. Callbacks may be provided for the three RPC events:
on_next(call_object, response) - called for each response
on_completed(call_object, status) - called when the RPC completes
on_error(call_object, error) - called if the RPC terminates due to an error
The default callbacks simply log the events. If a user-provided callback throws an exception, that exception is logged and raised when the user calls functions on the call object.
Unary and client streaming RPCs are invoked asynchronously by calling invoke on the method object. invoke takes the callbacks. The request may be provided either as a constructed protobuf or as a dict of proto fields in the request_args parameter.
# Pass the request as a protobuf and provide callbacks for all RPC events.
rpc = client.channel(1).call.MyService.MyServerStreaming
call = rpc.invoke(rpc.request(a=1), on_next_cb, on_completed_cb, on_error_cb)
# Create the request from the provided keyword args. Provide a callback for
# responses, but simply log the other RPC events.
call = client.channel(1).call.MyServer.MyUnary.invoke(
request_args=dict(a=1, b=2), on_next=lambda _, reply: process_this(reply))
For client and bidirectional streaming RPCs, requests are sent with the send method. The finish_and_wait method finishes the client stream. It optionally takes an iterable for responses to send before closing the stream.
# Start the call using callbacks.
call = client.channel(1).rpcs.MyServer.MyClientStream.invoke(on_error=err_cb)
# Send a single client stream request.
call.send(some_field=123)
# Send the requests, close the stream, then wait for the RPC to complete.
stream_responses = call.finish_and_wait([RequestType(some_field=123), ...])
- class pw_rpc.callback_client.BidirectionalStreamingCall(
- rpcs: PendingRpcs,
- rpc: PendingRpc,
- default_timeout_s: float | None,
- on_next: Callable[[CallTypeT, Any], Any] | None,
- on_completed: Callable[[CallTypeT, Any], Any] | None,
- on_error: Callable[[CallTypeT, Any], Any] | None,
- max_responses: int,
Tracks the state of a bidirectional streaming RPC call.
- finish_and_wait(
- requests: Iterable[Message] = (),
- *,
- timeout_s: UseDefault | float | None = UseDefault.VALUE,
Ends the client stream and waits for the RPC to complete.
- send(request_proto: Message | None = None, /, **request_fields) None #
Sends a message to the server in the client stream.
- class pw_rpc.callback_client.ClientStreamingCall(
- rpcs: PendingRpcs,
- rpc: PendingRpc,
- default_timeout_s: float | None,
- on_next: Callable[[CallTypeT, Any], Any] | None,
- on_completed: Callable[[CallTypeT, Any], Any] | None,
- on_error: Callable[[CallTypeT, Any], Any] | None,
- max_responses: int,
Tracks the state of a client streaming RPC call.
- finish_and_wait(
- requests: Iterable[Message] = (),
- *,
- timeout_s: UseDefault | float | None = UseDefault.VALUE,
Ends the client stream and waits for the RPC to complete.
- send(request_proto: Message | None = None, /, **request_fields) None #
Sends client stream request to the server.
- class pw_rpc.callback_client.ServerStreamingCall(
- rpcs: PendingRpcs,
- rpc: PendingRpc,
- default_timeout_s: float | None,
- on_next: Callable[[CallTypeT, Any], Any] | None,
- on_completed: Callable[[CallTypeT, Any], Any] | None,
- on_error: Callable[[CallTypeT, Any], Any] | None,
- max_responses: int,
Tracks the state of a server streaming RPC call.
- request_completion() None #
Sends client completion packet to server.
- class pw_rpc.callback_client.StreamResponse(status: Status, responses: Sequence[Any])#
Results from a server or bidirectional streaming RPC.
- responses: Sequence[Any]#
Alias for field number 1
- status: Status#
Alias for field number 0
- class pw_rpc.callback_client.UnaryCall(
- rpcs: PendingRpcs,
- rpc: PendingRpc,
- default_timeout_s: float | None,
- on_next: Callable[[CallTypeT, Any], Any] | None,
- on_completed: Callable[[CallTypeT, Any], Any] | None,
- on_error: Callable[[CallTypeT, Any], Any] | None,
- max_responses: int,
Tracks the state of a unary RPC call.
pw_rpc.descriptors#
Types representing the basic pw_rpc concepts: channel, service, method.
- class pw_rpc.descriptors.Channel(id: 'int', output: 'Callable[[bytes], Any]')#
- __init__(id: int, output: Callable[[bytes], Any]) None #
- class pw_rpc.descriptors.ChannelManipulator#
A a pipe interface that may manipulate packets before they’re sent.
ChannelManipulator``s allow application-specific packet handling to be injected into the packet processing pipeline for an ingress or egress channel-like pathway. This is particularly useful for integration testing resilience to things like packet loss on a usually-reliable transport. RPC server integrations (e.g. ``HdlcRpcLocalServerAndClient
) may provide an opportunity to inject aChannelManipulator
for this use case.A
ChannelManipulator
should not modify send_packet, as the consumer of aChannelManipulator
will usesend_packet
to insert the providedChannelManipulator
into a packet processing path.For example:
class PacketLogger(ChannelManipulator): def process_and_send(self, packet: bytes) -> None: _LOG.debug('Received packet with payload: %s', str(packet)) self.send_packet(packet) packet_logger = PacketLogger() # Configure actual send command. packet_logger.send_packet = socket.sendall # Route the output channel through the PacketLogger channel manipulator. channels = tuple(Channel(_DEFAULT_CHANNEL, packet_logger)) # Create a RPC client. reader = SocketReader(socket) with reader: client = HdlcRpcClient(reader, protos, channels, stdout) with client: # Do something with client
- __init__() None #
- abstract process_and_send(packet: bytes) None #
Processes an incoming packet before optionally sending it.
Implementations of this method may send the processed packet, multiple packets, or no packets at all via the registered send_packet() handler.
pw_rpc.console_tools#
Utilities for building tools that interact with pw_rpc.
- class pw_rpc.console_tools.ClientInfo(name: str, client: object, rpc_client: pw_rpc.Client)#
Information about an RPC client as it appears in the console.
- client: object#
Alias for field number 1
- name: str#
Alias for field number 0
- class pw_rpc.console_tools.Context(
- client_info: Collection[ClientInfo],
- default_client: Any,
- protos: Library,
- *,
- help_header: str = '',
The Context class is used to set up an interactive RPC console.
The Context manages a set of variables that make it easy to access RPCs and protobufs in a REPL.
As an example, this class can be used to set up a console with IPython:
context = console_tools.Context( clients, default_client, protos, help_header=WELCOME_MESSAGE) IPython.start_ipython(argv=[], user_ns=dict(**context.variables()))
- __init__(
- client_info: Collection[ClientInfo],
- default_client: Any,
- protos: Library,
- *,
- help_header: str = '',
Creates an RPC console context.
Protos and RPC services are accessible by their proto package and name. The target for these can be set with the set_target function.
- Parameters:
client_info – ClientInfo objects that represent the clients this console uses to communicate with other devices
default_client – default client object; must be one of the clients
protos – protobufs to use for RPCs for all clients
help_header – Message to display for the help command
- flattened_rpc_completions()#
Create a flattened list of rpc commands for repl auto-completion.
- set_target(selected_client: object, channel_id: int | None = None) None #
Sets the default target for commands.
- variables() dict[str, Any] #
Returns a mapping of names to variables for use in an RPC console.
- class pw_rpc.console_tools.Watchdog(on_reset: ~typing.Callable[[], ~typing.Any], on_expiration: ~typing.Callable[[], ~typing.Any], while_expired: ~typing.Callable[[], ~typing.Any] = <function Watchdog.<lambda>>, timeout_s: float = 1, expired_timeout_s: float | None = None)#
Simple class that times out unless reset.
This class could be used, for example, to track a device’s connection state for devices that send a periodic heartbeat packet.
- __init__(on_reset: ~typing.Callable[[], ~typing.Any], on_expiration: ~typing.Callable[[], ~typing.Any], while_expired: ~typing.Callable[[], ~typing.Any] = <function Watchdog.<lambda>>, timeout_s: float = 1, expired_timeout_s: float | None = None)#
Creates a watchdog; start() must be called to start it.
- Parameters:
on_reset – Function called when the watchdog is reset after having expired.
on_expiration – Function called when the timeout expires.
while_expired – Function called repeatedly while the watchdog is expired.
timeout_s – If reset() is not called for timeout_s, the watchdog expires and calls the on_expiration callback.
expired_timeout_s – While expired, the watchdog calls the while_expired callback every expired_timeout_s.
- reset() bool #
Resets the timeout; calls the on_reset callback if expired.
Returns True if was expired.
- start() None #
Starts the watchdog; must be called for the watchdog to work.
- stop() None #
Stops the watchdog.
This will not trigger the execution of any callbacks and will prevent further execution of any callbacks (including while_expired) until start is called again.
- pw_rpc.console_tools.alias_deprecated_command(variables: Any, old_name: str, new_name: str) None #
Adds an alias for an old command that redirects to the new command.
The deprecated command prints a message then invokes the new command.
- pw_rpc.console_tools.flattened_rpc_completions(client_info_list: Collection[ClientInfo]) dict[str, str] #
Create a flattened list of rpc commands for repl auto-completion.
This gathers all rpc commands from a set of ClientInfo variables and produces a flattened list of valid rpc commands to run in an RPC console. This is useful for passing into prompt_toolkit.completion.WordCompleter.
- Parameters:
client_info_list – List of ClientInfo variables
- Returns:
Dict of flattened rpc commands as keys, and ‘RPC’ as values. For example:
{ 'device.rpcs.pw.rpc.EchoService.Echo': 'RPC, 'device.rpcs.pw.rpc.BatteryService.GetBatteryStatus': 'RPC', }
- pw_rpc.console_tools.help_as_repr(function: Callable) Callable #
Wraps a function so that its repr() and docstring provide detailed help.
This is useful for creating commands in an interactive console. In a console, typing a function’s name and hitting Enter shows rich documentation with the full function signature, type annotations, and docstring when the function is wrapped with help_as_repr.