blob: 62b32f1cce0dc627e80cff8d66b766bc12bceae8 [file] [log] [blame]
# Copyright 2021 The Cobalt Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Performance tests of playback operation latency."""
import argparse
import logging
import time
import _env # pylint: disable=unused-import
from cobalt.media_integration_tests.test_app import AdsState
from cobalt.media_integration_tests.test_case import TestCase
from cobalt.media_integration_tests.test_util import MimeStrings, PlaybackUrls
class StartLatencyTest(TestCase):
"""
Test cases for playback operation latency.
"""
def __init__(self, *args, **kwargs):
super(StartLatencyTest, self).__init__(*args, **kwargs)
parser = argparse.ArgumentParser()
parser.add_argument('--test_times', default=5, type=int)
args, _ = parser.parse_known_args()
self.test_times = args.test_times
def run_first_start_latency_test(self, test_name, url, mime=None):
test_results = []
while len(test_results) < self.test_times:
app = self.CreateCobaltApp(url)
with app:
# Skip the test if the mime is not supported.
if mime and not app.IsMediaTypeSupported(mime):
logging.info(' ***** First start latency test result of %s : [n/a].',
test_name)
return
app.WaitUntilPlayerStart()
start_time = time.time()
start_media_time = app.CurrentMediaTime()
app.WaitUntilReachState(lambda _app: _app.PlayerState(
).pipeline_state.first_written_video_timestamp != _app.PlayerState().
pipeline_state.last_written_video_timestamp)
# Record inputs received time to know the preliminary impact of network.
inputs_received_time = time.time()
inputs_received_latency = inputs_received_time - start_time
# Let the playback play for 1 second.
app.WaitUntilMediaTimeReached(app.CurrentMediaTime() + 1)
# If it's playing ads, wait until ads end and run the test again.
if app.GetAdsState() != AdsState.NONE:
app.WaitUntilAdsEnd()
continue
media_time = app.CurrentMediaTime() - start_media_time
end_time = time.time()
start_latency = end_time - start_time - media_time
test_results.append([
start_latency, inputs_received_latency, start_time,
inputs_received_time, media_time, end_time
])
logging.info(' ***** First start latency test result of %s : %r.',
test_name, test_results)
def run_play_pause_latency_test(self, test_name, url, mime=None):
app = self.CreateCobaltApp(url)
with app:
# Skip the test if the mime is not supported.
if mime and not app.IsMediaTypeSupported(mime):
logging.info(' ***** Play/pause latency test result of %s : [n/a].',
test_name)
return
test_results = []
# Wait until the playback starts.
app.WaitUntilPlayerStart()
app.WaitUntilAdsEnd()
while len(test_results) < self.test_times:
# Let the playback play for 1 second.
app.WaitUntilMediaTimeReached(app.CurrentMediaTime() + 1)
start_time = time.time()
# Pause the playback.
app.PlayOrPause()
app.WaitUntilReachState(
lambda _app: _app.PlayerState().video_element_state.paused)
pause_latency = time.time() - start_time
start_time = time.time()
# Resume the playback.
app.PlayOrPause()
app.WaitUntilReachState(
lambda _app: not _app.PlayerState().video_element_state.paused)
play_latency = time.time() - start_time
test_results.append([play_latency, pause_latency])
logging.info(' ***** Play/pause latency test result of %s : %r.',
test_name, test_results)
def run_fastforward_latency_test(self, test_name, url, mime=None):
app = self.CreateCobaltApp(url)
with app:
# Skip the test if the mime is not supported.
if mime and not app.IsMediaTypeSupported(mime):
logging.info(' ***** Fastforward latency test result of %s : [n/a].',
test_name)
return
test_results = []
# Wait until the playback starts.
app.WaitUntilPlayerStart()
app.WaitUntilAdsEnd()
# Let the playback play for 1 second.
app.WaitUntilMediaTimeReached(app.CurrentMediaTime() + 1)
while len(test_results) < self.test_times:
old_media_time = app.CurrentMediaTime()
start_time = time.time()
app.Fastforward()
app.WaitUntilReachState(lambda _app: _app.PlayerState(
).pipeline_state.first_written_video_timestamp != _app.PlayerState().
pipeline_state.last_written_video_timestamp)
# Record inputs received time to know the preliminary impact of network.
inputs_received_latency = time.time() - start_time
app.WaitUntilReachState(
lambda _app: _app.CurrentMediaTime() > old_media_time + 10)
fastforward_latency = time.time() - start_time
test_results.append([fastforward_latency, inputs_received_latency])
logging.info(' ***** Fastforward latency test result of %s : %r.',
test_name, test_results)
TEST_PARAMETERS = [
('H264', PlaybackUrls.H264_ONLY, MimeStrings.H264),
('VP9', PlaybackUrls.VP9, MimeStrings.VP9),
('AV1', PlaybackUrls.AV1, MimeStrings.AV1),
('VP9_HDR_HLG', PlaybackUrls.VP9_HDR_HLG, MimeStrings.VP9_HDR_HLG),
('VP9_HDR_PQ', PlaybackUrls.VP9_HDR_PQ, MimeStrings.VP9_HDR_PQ),
]
for name, playback_url, mime_str in TEST_PARAMETERS:
TestCase.CreateTest(StartLatencyTest, 'first_start_latency_%s' % (name),
StartLatencyTest.run_first_start_latency_test, name,
playback_url, mime_str)
TestCase.CreateTest(StartLatencyTest, 'play_pause_latency_%s' % (name),
StartLatencyTest.run_play_pause_latency_test, name,
playback_url, mime_str)
TestCase.CreateTest(StartLatencyTest, 'fastforward_latency_%s' % (name),
StartLatencyTest.run_fastforward_latency_test, name,
playback_url, mime_str)