Pigweed
 
Loading...
Searching...
No Matches
stream_channel.h
1// Copyright 2024 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14#pragma once
15
16#include "pw_async2/dispatcher_base.h"
17#include "pw_channel/channel.h"
18#include "pw_multibuf/allocator.h"
19#include "pw_multibuf/allocator_async.h"
20#include "pw_multibuf/multibuf.h"
21#include "pw_status/status.h"
22#include "pw_stream/stream.h"
23#include "pw_sync/interrupt_spin_lock.h"
24#include "pw_sync/thread_notification.h"
25#include "pw_thread/thread.h"
26
27namespace pw::channel {
28namespace internal {
29
32 public:
33 StreamChannelReadState() = default;
35 StreamChannelReadState& operator=(const StreamChannelReadState&) = delete;
38
42
45
51 async2::Context& cx);
52
58 void ReadLoop(stream::Reader& reader);
59
60 private:
61 multibuf::OwnedChunk WaitForBufferToFillAndTakeFrontChunk();
62 void ProvideFilledBuffer(multibuf::MultiBuf&& filled_buffer);
63 void SetReadError(Status status);
64
65 sync::ThreadNotification buffer_to_fill_available_;
66 async2::Waker on_buffer_filled_;
67 sync::InterruptSpinLock buffer_lock_;
68 multibuf::MultiBuf buffer_to_fill_ PW_GUARDED_BY(buffer_lock_);
69 multibuf::MultiBuf filled_buffer_ PW_GUARDED_BY(buffer_lock_);
70 Status status_ PW_GUARDED_BY(buffer_lock_);
71};
72
75 public:
76 StreamChannelWriteState() = default;
78 StreamChannelWriteState& operator=(const StreamChannelWriteState&) = delete;
81
87
92
93 private:
94 sync::ThreadNotification data_available_;
95 sync::InterruptSpinLock buffer_lock_;
96 multibuf::MultiBuf buffer_to_write_ PW_GUARDED_BY(buffer_lock_);
97 Status status_;
98};
99
100} // namespace internal
101
104
115class StreamChannel final
116 : public channel::Implement<channel::ByteReaderWriter> {
117 public:
119 const thread::Options& read_thread_options,
120 multibuf::MultiBufAllocator& read_allocator,
121 stream::Writer& writer,
122 const thread::Options& write_thread_option,
123 multibuf::MultiBufAllocator& write_allocator);
124
125 // Deprecated: prefer the two-allocator constructor in order to prevent reads
126 // and writes blocking on waiting for buffer space from the other.
128 stream::Reader& reader,
129 const thread::Options& read_thread_options,
130 stream::Writer& writer,
131 const thread::Options& write_thread_options)
132 : StreamChannel(reader,
133 read_thread_options,
134 allocator,
135 writer,
136 write_thread_options,
137 allocator) {}
138
139 // StreamChannel is referenced from other threads and is therefore not movable
140 // or copyable.
141 StreamChannel(const StreamChannel&) = delete;
142 StreamChannel& operator=(const StreamChannel&) = delete;
143 StreamChannel(StreamChannel&&) = delete;
144 StreamChannel& operator=(StreamChannel&&) = delete;
145
146 private:
147 // StreamChannel must live forever, as its state is referenced by other
148 // threads.
149 ~StreamChannel() final = default;
150
151 Status ProvideBufferIfAvailable(async2::Context& cx);
152
154 async2::Context& cx) override;
155
156 async2::Poll<Status> DoPendReadyToWrite(async2::Context& cx) override;
157
158 async2::Poll<std::optional<multibuf::MultiBuf>> DoPendAllocateWriteBuffer(
159 async2::Context& cx, size_t min_bytes) override {
160 write_allocation_future_.SetDesiredSize(min_bytes);
161 return write_allocation_future_.Pend(cx);
162 }
163
164 Status DoStageWrite(multibuf::MultiBuf&& data) override;
165
166 async2::Poll<Status> DoPendWrite(async2::Context&) override {
167 return OkStatus();
168 }
169
170 async2::Poll<Status> DoPendClose(async2::Context&) override {
171 return async2::Ready(OkStatus());
172 }
173
174 stream::Reader& reader_;
175 stream::Writer& writer_;
178 multibuf::MultiBufAllocationFuture read_allocation_future_;
179 multibuf::MultiBufAllocationFuture write_allocation_future_;
180};
181
183
184} // namespace pw::channel
Definition: status.h:85
Definition: dispatcher_base.h:52
Definition: poll.h:54
Definition: dispatcher_base.h:318
Definition: channel.h:583
Definition: stream_channel.h:116
State for the stream-reading thread.
Definition: stream_channel.h:31
void ReadLoop(stream::Reader &reader)
void ProvideBufferToFill(multibuf::MultiBuf &&buf)
Provide a buffer for ReadLoop to read data into.
async2::Poll< Result< multibuf::MultiBuf > > PendFilledBuffer(async2::Context &cx)
State for the stream-writing thread.
Definition: stream_channel.h:74
Status SendData(multibuf::MultiBuf &&buf)
void WriteLoop(stream::Writer &writer)
Definition: allocator_async.h:90
Definition: allocator.h:54
Definition: multibuf.h:245
Definition: chunk.h:323
Definition: stream.h:345
Definition: stream.h:430
Definition: interrupt_spin_lock.h:48
Definition: thread_notification.h:36
Definition: options.h:40
constexpr Status OkStatus()
Definition: status.h:234