|  | # Copyright 2020 The Chromium Authors. All rights reserved. | 
|  | # Use of this source code is governed by a BSD-style license that can be | 
|  | # found in the LICENSE file. | 
|  | from __future__ import absolute_import | 
|  | import base64 | 
|  | import cgi | 
|  | import json | 
|  | import os | 
|  |  | 
|  | import six | 
|  | if not six.PY2: | 
|  | import html  # pylint: disable=import-error | 
|  |  | 
|  | from pylib.base import base_test_result | 
|  | import requests  # pylint: disable=import-error | 
|  |  | 
|  | # Comes from luci/resultdb/pbutil/test_result.go | 
|  | MAX_REPORT_LEN = 4 * 1024 | 
|  |  | 
|  | # Maps base_test_results to the luci test-result.proto. | 
|  | # https://godoc.org/go.chromium.org/luci/resultdb/proto/v1#TestStatus | 
|  | RESULT_MAP = { | 
|  | base_test_result.ResultType.UNKNOWN: 'ABORT', | 
|  | base_test_result.ResultType.PASS: 'PASS', | 
|  | base_test_result.ResultType.FAIL: 'FAIL', | 
|  | base_test_result.ResultType.CRASH: 'CRASH', | 
|  | base_test_result.ResultType.TIMEOUT: 'ABORT', | 
|  | base_test_result.ResultType.SKIP: 'SKIP', | 
|  | base_test_result.ResultType.NOTRUN: 'SKIP', | 
|  | } | 
|  |  | 
|  |  | 
|  | def TryInitClient(): | 
|  | """Tries to initialize a result_sink_client object. | 
|  |  | 
|  | Assumes that rdb stream is already running. | 
|  |  | 
|  | Returns: | 
|  | A ResultSinkClient for the result_sink server else returns None. | 
|  | """ | 
|  | try: | 
|  | with open(os.environ['LUCI_CONTEXT']) as f: | 
|  | sink = json.load(f)['result_sink'] | 
|  | return ResultSinkClient(sink) | 
|  | except KeyError: | 
|  | return None | 
|  |  | 
|  |  | 
|  | class ResultSinkClient(object): | 
|  | """A class to store the sink's post configurations and make post requests. | 
|  |  | 
|  | This assumes that the rdb stream has been called already and that the | 
|  | server is listening. | 
|  | """ | 
|  | def __init__(self, context): | 
|  | base_url = 'http://%s/prpc/luci.resultsink.v1.Sink' % context['address'] | 
|  | self.test_results_url = base_url + '/ReportTestResults' | 
|  | self.report_artifacts_url = base_url + '/ReportInvocationLevelArtifacts' | 
|  |  | 
|  | self.headers = { | 
|  | 'Content-Type': 'application/json', | 
|  | 'Accept': 'application/json', | 
|  | 'Authorization': 'ResultSink %s' % context['auth_token'], | 
|  | } | 
|  |  | 
|  | def Post(self, test_id, status, duration, test_log, test_file, | 
|  | artifacts=None): | 
|  | """Uploads the test result to the ResultSink server. | 
|  |  | 
|  | This assumes that the rdb stream has been called already and that | 
|  | server is ready listening. | 
|  |  | 
|  | Args: | 
|  | test_id: A string representing the test's name. | 
|  | status: A string representing if the test passed, failed, etc... | 
|  | duration: An int representing time in ms. | 
|  | test_log: A string representing the test's output. | 
|  | test_file: A string representing the file location of the test. | 
|  | artifacts: An optional dict of artifacts to attach to the test. | 
|  |  | 
|  | Returns: | 
|  | N/A | 
|  | """ | 
|  | assert status in RESULT_MAP | 
|  | expected = status in (base_test_result.ResultType.PASS, | 
|  | base_test_result.ResultType.SKIP) | 
|  | result_db_status = RESULT_MAP[status] | 
|  |  | 
|  | # Slightly smaller to allow addition of <pre> tags and message. | 
|  | report_check_size = MAX_REPORT_LEN - 45 | 
|  | if six.PY2: | 
|  | test_log_escaped = cgi.escape(test_log) | 
|  | else: | 
|  | test_log_escaped = html.escape(test_log) | 
|  | if len(test_log_escaped) > report_check_size: | 
|  | test_log_formatted = ('<pre>' + test_log_escaped[:report_check_size] + | 
|  | '...Full output in Artifact.</pre>') | 
|  | else: | 
|  | test_log_formatted = '<pre>' + test_log_escaped + '</pre>' | 
|  |  | 
|  | tr = { | 
|  | 'expected': | 
|  | expected, | 
|  | 'status': | 
|  | result_db_status, | 
|  | 'summaryHtml': | 
|  | test_log_formatted, | 
|  | 'tags': [ | 
|  | { | 
|  | 'key': 'test_name', | 
|  | 'value': test_id, | 
|  | }, | 
|  | { | 
|  | # Status before getting mapped to result_db statuses. | 
|  | 'key': 'android_test_runner_status', | 
|  | 'value': status, | 
|  | } | 
|  | ], | 
|  | 'testId': | 
|  | test_id, | 
|  | } | 
|  | artifacts = artifacts or {} | 
|  | if len(test_log_escaped) > report_check_size: | 
|  | # Upload the original log without any modifications. | 
|  | b64_log = six.ensure_str(base64.b64encode(six.ensure_binary(test_log))) | 
|  | artifacts.update({'Test Log': {'contents': b64_log}}) | 
|  | if artifacts: | 
|  | tr['artifacts'] = artifacts | 
|  |  | 
|  | if duration is not None: | 
|  | # Duration must be formatted to avoid scientific notation in case | 
|  | # number is too small or too large. Result_db takes seconds, not ms. | 
|  | # Need to use float() otherwise it does substitution first then divides. | 
|  | tr['duration'] = '%.9fs' % float(duration / 1000.0) | 
|  |  | 
|  | if test_file and str(test_file).startswith('//'): | 
|  | tr['testMetadata'] = { | 
|  | 'name': test_id, | 
|  | 'location': { | 
|  | 'file_name': test_file, | 
|  | 'repo': 'https://chromium.googlesource.com/chromium/src', | 
|  | } | 
|  | } | 
|  |  | 
|  | res = requests.post(url=self.test_results_url, | 
|  | headers=self.headers, | 
|  | data=json.dumps({'testResults': [tr]})) | 
|  | res.raise_for_status() | 
|  |  | 
|  | def ReportInvocationLevelArtifacts(self, artifacts): | 
|  | """Uploads invocation-level artifacts to the ResultSink server. | 
|  |  | 
|  | This is for artifacts that don't apply to a single test but to the test | 
|  | invocation as a whole (eg: system logs). | 
|  |  | 
|  | Args: | 
|  | artifacts: A dict of artifacts to attach to the invocation. | 
|  | """ | 
|  | req = {'artifacts': artifacts} | 
|  | res = requests.post(url=self.report_artifacts_url, | 
|  | headers=self.headers, | 
|  | data=json.dumps(req)) | 
|  | res.raise_for_status() |