C/C++ API Reference
Loading...
Searching...
No Matches
integration_test_socket_client.h
1// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include <atomic>
17#include <cstdint>
18#include <optional>
19#include <thread>
20
21#include "pw_hdlc/decoder.h"
22#include "pw_hdlc/default_addresses.h"
23#include "pw_hdlc/encoded_size.h"
24#include "pw_hdlc/rpc_channel.h"
25#include "pw_rpc/integration_testing.h"
26#include "pw_span/span.h"
27#include "pw_status/try.h"
28#include "pw_stream/socket_stream.h"
29
30namespace pw::rpc::integration_test {
31
33
34// Wraps an RPC client with a socket stream and a channel configured to use it.
35// Useful for integration tests that run across a socket.
36template <size_t kMaxTransmissionUnit>
38 public:
39 constexpr SocketClientContext()
40 : rpc_dispatch_thread_handle_(std::nullopt),
41 channel_output_(stream_, hdlc::kDefaultRpcAddress, "socket"),
42 channel_output_with_manipulator_(channel_output_),
43 channel_(
44 Channel::Create<kChannelId>(&channel_output_with_manipulator_)),
45 client_(span(&channel_, 1)) {}
46
47 Client& client() { return client_; }
48
49 // Connects to the specified host:port and starts a background thread to read
50 // packets from the socket.
51 Status Start(const char* host, uint16_t port) {
52 PW_TRY(stream_.Connect(host, port));
53 rpc_dispatch_thread_handle_.emplace(&SocketClientContext::ProcessPackets,
54 this);
55 return OkStatus();
56 }
57
58 // Terminates the client, joining the RPC dispatch thread.
59 void Terminate() {
60 PW_ASSERT(rpc_dispatch_thread_handle_.has_value());
61 should_terminate_.test_and_set();
62 // Close the stream to avoid blocking forever on a socket read.
63 stream_.Close();
64 rpc_dispatch_thread_handle_->join();
65 }
66
67 // Configure options for the socket associated with the client.
68 int SetSockOpt(int level,
69 int optname,
70 const void* optval,
71 unsigned int optlen) {
72 return stream_.SetSockOpt(level, optname, optval, optlen);
73 }
74
75 void SetEgressChannelManipulator(
76 ChannelManipulator* new_channel_manipulator) {
77 channel_output_with_manipulator_.set_channel_manipulator(
78 new_channel_manipulator);
79 }
80
81 void SetIngressChannelManipulator(
82 ChannelManipulator* new_channel_manipulator) {
83 if (new_channel_manipulator != nullptr) {
84 new_channel_manipulator->set_send_packet([&](ConstByteSpan payload) {
85 return client_.ProcessPacket(payload);
86 });
87 }
88 ingress_channel_manipulator_ = new_channel_manipulator;
89 }
90
91 // Calls Start for localhost.
92 Status Start(uint16_t port) { return Start("localhost", port); }
93
94 private:
95 void ProcessPackets();
96
97 class ChannelOutputWithManipulator : public ChannelOutput {
98 public:
99 ChannelOutputWithManipulator(ChannelOutput& actual_output)
100 : ChannelOutput(actual_output.name()),
101 actual_output_(actual_output),
102 channel_manipulator_(nullptr) {}
103
104 void set_channel_manipulator(ChannelManipulator* new_channel_manipulator) {
105 if (new_channel_manipulator != nullptr) {
106 new_channel_manipulator->set_send_packet(
107 ChannelManipulator::SendCallback(
108 [&](ConstByteSpan payload)
109 __attribute__((no_thread_safety_analysis)) {
110 return actual_output_.Send(payload);
111 }));
112 }
113 channel_manipulator_ = new_channel_manipulator;
114 }
115
116 size_t MaximumTransmissionUnit() override {
117 return actual_output_.MaximumTransmissionUnit();
118 }
119 Status Send(span<const std::byte> buffer) override
120 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) {
121 if (channel_manipulator_ != nullptr) {
122 return channel_manipulator_->ProcessAndSend(buffer);
123 }
124
125 return actual_output_.Send(buffer);
126 }
127
128 private:
129 ChannelOutput& actual_output_;
130 ChannelManipulator* channel_manipulator_;
131 };
132
133 std::atomic_flag should_terminate_ = ATOMIC_FLAG_INIT;
134 std::optional<std::thread> rpc_dispatch_thread_handle_;
135 stream::SocketStream stream_;
136 hdlc::FixedMtuChannelOutput<kMaxTransmissionUnit> channel_output_;
137 ChannelOutputWithManipulator channel_output_with_manipulator_;
138 ChannelManipulator* ingress_channel_manipulator_;
139 Channel channel_;
140 Client client_;
141};
142
143template <size_t kMaxTransmissionUnit>
145 constexpr size_t kDecoderBufferSize =
146 hdlc::Decoder::RequiredBufferSizeForFrameSize(kMaxTransmissionUnit);
147 std::array<std::byte, kDecoderBufferSize> decode_buffer;
148 hdlc::Decoder decoder(decode_buffer);
149
150 while (true) {
151 std::byte byte[1];
152 Result<ByteSpan> read = stream_.Read(byte);
153
154 if (should_terminate_.test()) {
155 return;
156 }
157
158 if (!read.ok() || read->empty()) {
159 continue;
160 }
161
162 if (auto result = decoder.Process(*byte); result.ok()) {
163 hdlc::Frame& frame = result.value();
164 if (frame.address() == hdlc::kDefaultRpcAddress) {
165 if (ingress_channel_manipulator_ != nullptr) {
166 PW_ASSERT(
167 ingress_channel_manipulator_->ProcessAndSend(frame.data()).ok());
168 } else {
169 PW_ASSERT(client_.ProcessPacket(frame.data()).ok());
170 }
171 }
172 }
173 }
174}
175
177
178} // namespace pw::rpc::integration_test
Definition: poll.h:25
Definition: status.h:109
Definition: decoder.h:68
Definition: decoder.h:36
Definition: channel.h:246
Definition: channel.h:110
Definition: client.h:28
Definition: integration_testing.h:42
Definition: integration_test_socket_client.h:37
Definition: span_impl.h:235
Definition: socket_stream.h:32
#define PW_TRY(expr)
Returns early if expr is a non-OK Status or Result.
Definition: try.h:27
constexpr Status OkStatus()
Definition: status.h:297
#define PW_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: lock_annotations.h:146