21#include "pw_hdlc/decoder.h"
22#include "pw_hdlc/default_addresses.h"
23#include "pw_hdlc/encoded_size.h"
24#include "pw_hdlc/rpc_channel.h"
25#include "pw_rpc/integration_testing.h"
26#include "pw_span/span.h"
27#include "pw_status/try.h"
28#include "pw_stream/socket_stream.h"
30namespace pw::rpc::integration_test {
36template <
size_t kMaxTransmissionUnit>
40 : rpc_dispatch_thread_handle_(std::nullopt),
41 channel_output_(stream_, hdlc::kDefaultRpcAddress,
"socket"),
42 channel_output_with_manipulator_(channel_output_),
44 Channel::Create<kChannelId>(&channel_output_with_manipulator_)),
45 client_(
span(&channel_, 1)) {}
47 Client& client() {
return client_; }
51 Status Start(
const char* host, uint16_t port) {
52 PW_TRY(stream_.Connect(host, port));
53 rpc_dispatch_thread_handle_.emplace(&SocketClientContext::ProcessPackets,
60 PW_ASSERT(rpc_dispatch_thread_handle_.has_value());
61 should_terminate_.test_and_set();
64 rpc_dispatch_thread_handle_->join();
68 int SetSockOpt(
int level,
71 unsigned int optlen) {
72 return stream_.SetSockOpt(level, optname, optval, optlen);
75 void SetEgressChannelManipulator(
77 channel_output_with_manipulator_.set_channel_manipulator(
78 new_channel_manipulator);
81 void SetIngressChannelManipulator(
83 if (new_channel_manipulator !=
nullptr) {
84 new_channel_manipulator->set_send_packet([&](
ConstByteSpan payload) {
85 return client_.ProcessPacket(payload);
88 ingress_channel_manipulator_ = new_channel_manipulator;
92 Status Start(uint16_t port) {
return Start(
"localhost", port); }
95 void ProcessPackets();
101 actual_output_(actual_output),
102 channel_manipulator_(
nullptr) {}
105 if (new_channel_manipulator !=
nullptr) {
106 new_channel_manipulator->set_send_packet(
107 ChannelManipulator::SendCallback(
109 __attribute__((no_thread_safety_analysis)) {
110 return actual_output_.Send(payload);
113 channel_manipulator_ = new_channel_manipulator;
116 size_t MaximumTransmissionUnit()
override {
117 return actual_output_.MaximumTransmissionUnit();
121 if (channel_manipulator_ !=
nullptr) {
122 return channel_manipulator_->ProcessAndSend(buffer);
125 return actual_output_.Send(buffer);
133 std::atomic_flag should_terminate_ = ATOMIC_FLAG_INIT;
134 std::optional<std::thread> rpc_dispatch_thread_handle_;
136 hdlc::FixedMtuChannelOutput<kMaxTransmissionUnit> channel_output_;
137 ChannelOutputWithManipulator channel_output_with_manipulator_;
143template <
size_t kMaxTransmissionUnit>
145 constexpr size_t kDecoderBufferSize =
146 hdlc::Decoder::RequiredBufferSizeForFrameSize(kMaxTransmissionUnit);
147 std::array<std::byte, kDecoderBufferSize> decode_buffer;
154 if (should_terminate_.test()) {
158 if (!read.ok() || read->empty()) {
162 if (
auto result = decoder.Process(*
byte); result.ok()) {
164 if (frame.address() == hdlc::kDefaultRpcAddress) {
165 if (ingress_channel_manipulator_ !=
nullptr) {
167 ingress_channel_manipulator_->ProcessAndSend(frame.data()).ok());
169 PW_ASSERT(client_.ProcessPacket(frame.data()).ok());
Definition: channel.h:246
Definition: channel.h:110
Definition: integration_testing.h:42
Definition: integration_test_socket_client.h:37
Definition: span_impl.h:235
Definition: socket_stream.h:32
#define PW_TRY(expr)
Returns early if expr is a non-OK Status or Result.
Definition: try.h:27
constexpr Status OkStatus()
Definition: status.h:297
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146