C/C++ API Reference
Loading...
Searching...
No Matches
stream_channel.h
1// Copyright 2024 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include "pw_async2/dispatcher_base.h"
17#include "pw_channel/channel.h"
18#include "pw_multibuf/allocator.h"
19#include "pw_multibuf/allocator_async.h"
20#include "pw_multibuf/multibuf.h"
21#include "pw_status/status.h"
22#include "pw_stream/stream.h"
23#include "pw_sync/interrupt_spin_lock.h"
24#include "pw_sync/thread_notification.h"
25#include "pw_thread/thread.h"
26
27namespace pw::channel {
28namespace internal {
29
32 public:
33 StreamChannelReadState() = default;
35 StreamChannelReadState& operator=(const StreamChannelReadState&) = delete;
38
42
45
51
57 void ReadLoop(stream::Reader& reader);
58
59 private:
60 multibuf::OwnedChunk WaitForBufferToFillAndTakeFrontChunk();
61 void ProvideFilledBuffer(multibuf::MultiBuf&& filled_buffer);
62 void SetReadError(Status status);
63
64 sync::ThreadNotification buffer_to_fill_available_;
65 async2::Waker on_buffer_filled_;
66 sync::InterruptSpinLock buffer_lock_;
67 multibuf::MultiBuf buffer_to_fill_ PW_GUARDED_BY(buffer_lock_);
68 multibuf::MultiBuf filled_buffer_ PW_GUARDED_BY(buffer_lock_);
69 Status status_ PW_GUARDED_BY(buffer_lock_);
70};
71
74 public:
75 StreamChannelWriteState() = default;
77 StreamChannelWriteState& operator=(const StreamChannelWriteState&) = delete;
80
86
91
92 private:
93 sync::ThreadNotification data_available_;
94 sync::InterruptSpinLock buffer_lock_;
95 multibuf::MultiBuf buffer_to_write_ PW_GUARDED_BY(buffer_lock_);
96 Status status_;
97};
98
99} // namespace internal
100
102
105
116class StreamChannel final
117 : public channel::Implement<channel::ByteReaderWriter> {
118 public:
120 const thread::Options& read_thread_options,
121 multibuf::MultiBufAllocator& read_allocator,
122 stream::Writer& writer,
123 const thread::Options& write_thread_option,
124 multibuf::MultiBufAllocator& write_allocator);
125
126 // Deprecated: prefer the two-allocator constructor in order to prevent reads
127 // and writes blocking on waiting for buffer space from the other.
129 stream::Reader& reader,
130 const thread::Options& read_thread_options,
131 stream::Writer& writer,
132 const thread::Options& write_thread_options)
133 : StreamChannel(reader,
134 read_thread_options,
135 allocator,
136 writer,
137 write_thread_options,
138 allocator) {}
139
140 // StreamChannel is referenced from other threads and is therefore not movable
141 // or copyable.
142 StreamChannel(const StreamChannel&) = delete;
143 StreamChannel& operator=(const StreamChannel&) = delete;
144 StreamChannel(StreamChannel&&) = delete;
145 StreamChannel& operator=(StreamChannel&&) = delete;
146
147 private:
148 // StreamChannel must live forever, as its state is referenced by other
149 // threads.
150 ~StreamChannel() final = default;
151
152 // Even though the destructor is never called, classes with private
153 // destructors must friend `pw::NoDestructor`.
154 friend class pw::NoDestructor<StreamChannel>;
155
156 Status ProvideBufferIfAvailable(async2::Context& cx);
157
159 async2::Context& cx) override;
160
161 async2::Poll<Status> DoPendReadyToWrite(async2::Context& cx) override;
162
163 async2::PollOptional<multibuf::MultiBuf> DoPendAllocateWriteBuffer(
164 async2::Context& cx, size_t min_bytes) override {
165 write_allocation_future_.SetDesiredSize(min_bytes);
166 return write_allocation_future_.Pend(cx);
167 }
168
169 Status DoStageWrite(multibuf::MultiBuf&& data) override;
170
171 async2::Poll<Status> DoPendWrite(async2::Context&) override {
172 return OkStatus();
173 }
174
175 async2::Poll<Status> DoPendClose(async2::Context&) override {
176 return async2::Ready(OkStatus());
177 }
178
179 stream::Reader& reader_;
180 stream::Writer& writer_;
183 multibuf::MultiBufAllocationFuture read_allocation_future_;
184 multibuf::MultiBufAllocationFuture write_allocation_future_;
185};
186
188
189} // namespace pw::channel
Definition: no_destructor.h:77
Definition: status.h:109
Definition: context.h:55
Definition: poll.h:60
Definition: waker.h:160
Definition: channel.h:583
Definition: stream_channel.h:117
State for the stream-reading thread.
Definition: stream_channel.h:31
State for the stream-writing thread.
Definition: stream_channel.h:73
Definition: allocator_async.h:93
Definition: allocator.h:57
Definition: multibuf_v1.h:248
Definition: chunk.h:326
Definition: stream.h:351
Definition: stream.h:440
Definition: interrupt_spin_lock.h:50
Definition: thread_notification.h:38
Definition: options.h:42
constexpr Poll Ready()
Returns a value indicating completion.
Definition: poll.h:255
void ReadLoop(stream::Reader &reader)
void ProvideBufferToFill(multibuf::MultiBuf &&buf)
Provide a buffer for ReadLoop to read data into.
Status SendData(multibuf::MultiBuf &&buf)
void WriteLoop(stream::Writer &writer)
async2::PollResult< multibuf::MultiBuf > PendFilledBuffer(async2::Context &cx)
constexpr Status OkStatus()
Definition: status.h:297
#define PW_GUARDED_BY(x)
Definition: lock_annotations.h:60