blob: f6342b5b2c8c23afe22033193ea2b5152c835d4e [file] [log] [blame]
Kaido Kert25902c62024-06-17 17:10:28 -07001// Copyright 2016 The Chromium Authors
Kaido Kert08336fa2022-02-08 14:10:50 -08002// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "media/remoting/demuxer_stream_adapter.h"
6
7#include <memory>
8#include <vector>
9
Kaido Kert25902c62024-06-17 17:10:28 -070010#include "base/functional/bind.h"
11#include "base/functional/callback_helpers.h"
Kaido Kert08336fa2022-02-08 14:10:50 -080012#include "base/run_loop.h"
Kaido Kert25902c62024-06-17 17:10:28 -070013#include "base/task/single_thread_task_runner.h"
Kaido Kert08336fa2022-02-08 14:10:50 -080014#include "base/test/task_environment.h"
15#include "media/base/decoder_buffer.h"
16#include "media/base/demuxer_stream.h"
17#include "media/remoting/fake_media_resource.h"
18#include "media/remoting/fake_remoter.h"
Kaido Kert08336fa2022-02-08 14:10:50 -080019#include "testing/gmock/include/gmock/gmock.h"
20#include "testing/gtest/include/gtest/gtest.h"
21
22using openscreen::cast::RpcMessenger;
23using testing::_;
24using testing::Invoke;
25using testing::Return;
26
27namespace media {
28namespace remoting {
29
30class MockDemuxerStreamAdapter {
31 public:
32 MockDemuxerStreamAdapter(
33 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
34 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner,
35 const std::string& name,
36 DemuxerStream* demuxer_stream,
37 mojo::PendingRemote<mojom::RemotingDataStreamSender> stream_sender_remote,
38 mojo::ScopedDataPipeProducerHandle producer_handle) {
39 rpc_messenger_ = std::make_unique<RpcMessenger>(
40 [cb =
41 base::BindRepeating(&MockDemuxerStreamAdapter::OnSendMessageToSink,
42 weak_factory_.GetWeakPtr())](
43 std::vector<uint8_t> message) { cb.Run(std::move(message)); });
44 demuxer_stream_adapter_ = std::make_unique<DemuxerStreamAdapter>(
45 std::move(main_task_runner), std::move(media_task_runner), name,
46 demuxer_stream, rpc_messenger_->GetWeakPtr(),
47 rpc_messenger_->GetUniqueHandle(), std::move(stream_sender_remote),
48 std::move(producer_handle),
49 base::BindOnce(&MockDemuxerStreamAdapter::OnError,
50 weak_factory_.GetWeakPtr()));
51
52 // Faking initialization with random callback handle to start mojo watcher.
53 demuxer_stream_adapter_->Initialize(3);
54 }
55
56 MockDemuxerStreamAdapter(const MockDemuxerStreamAdapter&) = delete;
57 MockDemuxerStreamAdapter& operator=(const MockDemuxerStreamAdapter&) = delete;
58
59 ~MockDemuxerStreamAdapter() {
60 // Make sure unit tests that did not expect errors did not cause any errors.
61 EXPECT_TRUE(errors_.empty());
62 }
63
64 int rpc_handle() const { return demuxer_stream_adapter_->rpc_handle(); }
65
66 base::WeakPtr<MockDemuxerStreamAdapter> GetWeakPtr() {
67 return weak_factory_.GetWeakPtr();
68 }
69
70 void DoDuplicateInitialize() { demuxer_stream_adapter_->Initialize(999); }
71
72 void TakeErrors(std::vector<StopTrigger>* errors) {
73 errors->swap(errors_);
74 errors_.clear();
75 }
76
77 // Fake to signal that it's in reading state.
78 void FakeReadUntil(int read_until_count, int callback_handle) {
79 std::unique_ptr<openscreen::cast::RpcMessage> rpc(
80 new openscreen::cast::RpcMessage());
81 rpc->set_handle(rpc_handle());
82 rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_READUNTIL);
83 auto* read_message = rpc->mutable_demuxerstream_readuntil_rpc();
84 read_message->set_callback_handle(
85 callback_handle); // Given an unique callback handle.
86 read_message->set_count(read_until_count); // Request 1 frame
87
88 demuxer_stream_adapter_->OnReceivedRpc(std::move(rpc));
89 }
90 void OnNewBuffer(scoped_refptr<DecoderBuffer> frame) {
91 demuxer_stream_adapter_->OnNewBuffer(DemuxerStream::kOk, frame);
92 }
93
94 void SignalFlush(bool flush) { demuxer_stream_adapter_->SignalFlush(flush); }
95
96 openscreen::cast::RpcMessage* last_received_rpc() const {
97 return last_received_rpc_.get();
98 }
99
100 private:
101 void OnSendMessageToSink(std::vector<uint8_t> message) {
102 last_received_rpc_ = std::make_unique<openscreen::cast::RpcMessage>();
103 CHECK(last_received_rpc_->ParseFromArray(message.data(), message.size()));
104 }
105
106 void OnError(StopTrigger stop_trigger) { errors_.push_back(stop_trigger); }
107
108 std::unique_ptr<RpcMessenger> rpc_messenger_;
109 std::unique_ptr<DemuxerStreamAdapter> demuxer_stream_adapter_;
110 std::unique_ptr<openscreen::cast::RpcMessage> last_received_rpc_;
111
112 std::vector<StopTrigger> errors_;
113
114 base::WeakPtrFactory<MockDemuxerStreamAdapter> weak_factory_{this};
115};
116
117class DemuxerStreamAdapterTest : public ::testing::Test {
118 public:
119 DemuxerStreamAdapterTest() = default;
120
121 DemuxerStreamAdapterTest(const DemuxerStreamAdapterTest&) = delete;
122 DemuxerStreamAdapterTest& operator=(const DemuxerStreamAdapterTest&) = delete;
123
124 ~DemuxerStreamAdapterTest() override = default;
125
126 void SetUpDataPipe() {
127 constexpr size_t kDataPipeCapacity = 256;
128 demuxer_stream_ = std::make_unique<FakeDemuxerStream>(true); // audio.
129 const MojoCreateDataPipeOptions data_pipe_options{
130 sizeof(MojoCreateDataPipeOptions), MOJO_CREATE_DATA_PIPE_FLAG_NONE, 1,
131 kDataPipeCapacity};
132 mojo::PendingRemote<mojom::RemotingDataStreamSender> stream_sender;
133 mojo::ScopedDataPipeProducerHandle producer_end;
134 mojo::ScopedDataPipeConsumerHandle consumer_end;
135 CHECK_EQ(MOJO_RESULT_OK, mojo::CreateDataPipe(&data_pipe_options,
136 producer_end, consumer_end));
137
138 data_stream_sender_ = std::make_unique<FakeRemotingDataStreamSender>(
139 stream_sender.InitWithNewPipeAndPassReceiver(),
140 std::move(consumer_end));
141 demuxer_stream_adapter_ = std::make_unique<MockDemuxerStreamAdapter>(
142 task_environment_.GetMainThreadTaskRunner(),
143 task_environment_.GetMainThreadTaskRunner(), "test",
144 demuxer_stream_.get(), std::move(stream_sender),
145 std::move(producer_end));
146 // DemuxerStreamAdapter constructor posts task to main thread to
147 // register MessageReceiverCallback. Therefore it should call
148 // RunPendingTasks() to make sure task is executed.
149 RunPendingTasks();
150 }
151
152 void TearDown() override { base::RunLoop().RunUntilIdle(); }
153
154 void RunPendingTasks() { base::RunLoop().RunUntilIdle(); }
155
156 protected:
157 void SetUp() override { SetUpDataPipe(); }
158
Kaido Kert08336fa2022-02-08 14:10:50 -0800159 base::test::SingleThreadTaskEnvironment task_environment_;
160 std::unique_ptr<FakeDemuxerStream> demuxer_stream_;
161 std::unique_ptr<FakeRemotingDataStreamSender> data_stream_sender_;
162 std::unique_ptr<MockDemuxerStreamAdapter> demuxer_stream_adapter_;
163};
164
165TEST_F(DemuxerStreamAdapterTest, SingleReadUntil) {
166 // Read will be called once since it doesn't return frame buffer in the dummy
167 // implementation.
Kaido Kert25902c62024-06-17 17:10:28 -0700168 EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(1);
Kaido Kert08336fa2022-02-08 14:10:50 -0800169
170 demuxer_stream_adapter_->FakeReadUntil(3, 999);
171 RunPendingTasks();
172}
173
174TEST_F(DemuxerStreamAdapterTest, MultiReadUntil) {
175 // Read will be called once since it doesn't return frame buffer in the dummy
176 // implementation, and 2nd one will not proceed when there is ongoing read.
Kaido Kert25902c62024-06-17 17:10:28 -0700177 EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(1);
Kaido Kert08336fa2022-02-08 14:10:50 -0800178
179 demuxer_stream_adapter_->FakeReadUntil(1, 100);
180 RunPendingTasks();
181
182 demuxer_stream_adapter_->FakeReadUntil(2, 101);
183 RunPendingTasks();
184}
185
186TEST_F(DemuxerStreamAdapterTest, WriteOneFrameSmallerThanCapacity) {
Kaido Kert25902c62024-06-17 17:10:28 -0700187 EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(1);
Kaido Kert08336fa2022-02-08 14:10:50 -0800188 // Sends a frame with size 50 bytes, pts = 1 and key frame.
189 demuxer_stream_->CreateFakeFrame(50, true, 1 /* pts */);
190 demuxer_stream_adapter_->FakeReadUntil(1, 999);
191 RunPendingTasks();
192
193 // Checks if it's sent to consumer side and data is correct
194 ASSERT_EQ(data_stream_sender_->send_frame_count(), 1U);
195 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(0, 50, true, 1));
196 openscreen::cast::RpcMessage* last_rpc =
197 demuxer_stream_adapter_->last_received_rpc();
198 ASSERT_TRUE(last_rpc);
199 ASSERT_EQ(last_rpc->proc(),
200 openscreen::cast::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
201 ASSERT_EQ(last_rpc->handle(), 999);
202 data_stream_sender_->ResetHistory();
203}
204
205TEST_F(DemuxerStreamAdapterTest, WriteOneFrameLargerThanCapacity) {
Kaido Kert25902c62024-06-17 17:10:28 -0700206 EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(1);
Kaido Kert08336fa2022-02-08 14:10:50 -0800207 // Sends a frame with size 800 bytes, pts = 1 and key frame.
208 demuxer_stream_->CreateFakeFrame(800, true, 1 /* pts */);
209 demuxer_stream_adapter_->FakeReadUntil(1, 999);
210 RunPendingTasks();
211
212 // Checks if it's sent to consumer side and data is correct
213 ASSERT_EQ(data_stream_sender_->send_frame_count(), 1U);
214 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(0, 800, true, 1));
215 openscreen::cast::RpcMessage* last_rpc =
216 demuxer_stream_adapter_->last_received_rpc();
217 ASSERT_TRUE(last_rpc);
218 ASSERT_EQ(last_rpc->proc(),
219 openscreen::cast::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
220 ASSERT_EQ(last_rpc->handle(), 999);
221 data_stream_sender_->ResetHistory();
222}
223
224TEST_F(DemuxerStreamAdapterTest, SendFrameAndSignalFlushMix) {
Kaido Kert25902c62024-06-17 17:10:28 -0700225 EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(4);
Kaido Kert08336fa2022-02-08 14:10:50 -0800226 // Sends a frame with size 50 bytes, pts = 1 and key frame.
227 demuxer_stream_->CreateFakeFrame(50, true, 1 /* pts */);
228 // Issues ReadUntil request with frame count up to 1 (fetch #0).
229 demuxer_stream_adapter_->FakeReadUntil(1, 100);
230 RunPendingTasks();
231 ASSERT_EQ(data_stream_sender_->send_frame_count(), 1U);
232 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(0, 50, true, 1));
233 openscreen::cast::RpcMessage* last_rpc =
234 demuxer_stream_adapter_->last_received_rpc();
235 ASSERT_TRUE(last_rpc);
236 ASSERT_EQ(last_rpc->proc(),
237 openscreen::cast::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
238 ASSERT_EQ(last_rpc->handle(), 100);
239 data_stream_sender_->ResetHistory();
240
241 // Sends two frames with size 100 + 150 bytes
242 demuxer_stream_->CreateFakeFrame(100, false, 2 /* pts */);
243 demuxer_stream_->CreateFakeFrame(150, false, 3 /* pts */);
244 // Issues ReadUntil request with frame count up to 3 (fetch #1 and #2).
245 demuxer_stream_adapter_->FakeReadUntil(3, 101);
246 RunPendingTasks();
247 ASSERT_EQ(data_stream_sender_->send_frame_count(), 2U);
248 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(0, 100, false, 2));
249 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(1, 150, false, 3));
250 last_rpc = demuxer_stream_adapter_->last_received_rpc();
251 ASSERT_TRUE(last_rpc);
252 ASSERT_EQ(last_rpc->proc(),
253 openscreen::cast::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
254 ASSERT_EQ(last_rpc->handle(), 101);
255 data_stream_sender_->ResetHistory();
256
257 // Signal flush
258 ASSERT_EQ(data_stream_sender_->cancel_in_flight_count(), 0U);
259 demuxer_stream_adapter_->SignalFlush(true);
260 RunPendingTasks();
261 ASSERT_EQ(data_stream_sender_->cancel_in_flight_count(), 1U);
262
263 // ReadUntil request after flush signaling should be ignored.
264 demuxer_stream_->CreateFakeFrame(100, false, 4 /* pts */);
265 demuxer_stream_->CreateFakeFrame(100, false, 5 /* pts */);
266 // Issues ReadUntil request with frame count up to 5 (fetch #3 and #4).
267 demuxer_stream_adapter_->FakeReadUntil(5, 102);
268 RunPendingTasks();
269 ASSERT_EQ(data_stream_sender_->send_frame_count(), 0U);
270
271 // Signal flush done
272 demuxer_stream_adapter_->SignalFlush(false);
273 RunPendingTasks();
274 ASSERT_EQ(data_stream_sender_->cancel_in_flight_count(), 1U);
275 data_stream_sender_->ResetHistory();
276
277 // Re-issues ReadUntil request with frame count up to 4 (fetch #3).
278 demuxer_stream_adapter_->FakeReadUntil(4, 103);
279 RunPendingTasks();
280 ASSERT_EQ(data_stream_sender_->send_frame_count(), 1U);
281 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(0, 100, false, 4));
282 last_rpc = demuxer_stream_adapter_->last_received_rpc();
283 ASSERT_TRUE(last_rpc);
284 ASSERT_EQ(last_rpc->proc(),
285 openscreen::cast::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
286 ASSERT_EQ(last_rpc->handle(), 103);
287 data_stream_sender_->ResetHistory();
288}
289
290TEST_F(DemuxerStreamAdapterTest, DuplicateInitializeCausesFatalError) {
291 std::vector<StopTrigger> errors;
292 demuxer_stream_adapter_->TakeErrors(&errors);
293 ASSERT_TRUE(errors.empty());
294
295 demuxer_stream_adapter_->DoDuplicateInitialize();
296 demuxer_stream_adapter_->TakeErrors(&errors);
297 ASSERT_EQ(1u, errors.size());
298 EXPECT_EQ(PEERS_OUT_OF_SYNC, errors[0]);
299}
300
Kaido Kert25902c62024-06-17 17:10:28 -0700301TEST_F(DemuxerStreamAdapterTest, ClosingMessagePipeCausesMojoDisconnected) {
Kaido Kert08336fa2022-02-08 14:10:50 -0800302 std::vector<StopTrigger> errors;
303 demuxer_stream_adapter_->TakeErrors(&errors);
304 ASSERT_TRUE(errors.empty());
305
Kaido Kert25902c62024-06-17 17:10:28 -0700306 // Closes one end of mojo message pipes.
Kaido Kert08336fa2022-02-08 14:10:50 -0800307 data_stream_sender_.reset();
308 RunPendingTasks(); // Allow notification from mojo to propagate.
309
310 demuxer_stream_adapter_->TakeErrors(&errors);
311 ASSERT_EQ(1u, errors.size());
Kaido Kert25902c62024-06-17 17:10:28 -0700312 EXPECT_EQ(MOJO_DISCONNECTED, errors[0]);
Kaido Kert08336fa2022-02-08 14:10:50 -0800313}
314
Kaido Kert25902c62024-06-17 17:10:28 -0700315TEST_F(DemuxerStreamAdapterTest, ClosingDataPipeCausesWriteError) {
316 EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(1);
317
318 std::vector<StopTrigger> errors;
319 demuxer_stream_adapter_->TakeErrors(&errors);
320 ASSERT_TRUE(errors.empty());
321
322 // Closes the consumer end of the data pipe.
323 data_stream_sender_->CloseDataPipe();
324 demuxer_stream_->CreateFakeFrame(100, true /* key frame */, 1 /* pts */);
325 demuxer_stream_adapter_->FakeReadUntil(1, 999);
326 RunPendingTasks(); // Allow notification from mojo to propagate.
327
328 demuxer_stream_adapter_->TakeErrors(&errors);
329 ASSERT_EQ(1u, errors.size());
330 EXPECT_EQ(DATA_PIPE_WRITE_ERROR, errors[0]);
331}
332
333} // namespace remoting
Kaido Kert08336fa2022-02-08 14:10:50 -0800334} // namespace media