diff options
Diffstat (limited to 'tools/testing/kunit/kunit_parser.py')
-rw-r--r-- | tools/testing/kunit/kunit_parser.py | 217 |
1 files changed, 144 insertions, 73 deletions
diff --git a/tools/testing/kunit/kunit_parser.py b/tools/testing/kunit/kunit_parser.py index e8bcc139702e..c3c524b79db8 100644 --- a/tools/testing/kunit/kunit_parser.py +++ b/tools/testing/kunit/kunit_parser.py @@ -43,26 +43,68 @@ class TestCase(object): class TestStatus(Enum): SUCCESS = auto() FAILURE = auto() + SKIPPED = auto() TEST_CRASHED = auto() NO_TESTS = auto() FAILURE_TO_PARSE_TESTS = auto() +class LineStream: + """Provides a peek()/pop() interface over an iterator of (line#, text).""" + _lines: Iterator[Tuple[int, str]] + _next: Tuple[int, str] + _done: bool + + def __init__(self, lines: Iterator[Tuple[int, str]]): + self._lines = lines + self._done = False + self._next = (0, '') + self._get_next() + + def _get_next(self) -> None: + try: + self._next = next(self._lines) + except StopIteration: + self._done = True + + def peek(self) -> str: + return self._next[1] + + def pop(self) -> str: + n = self._next + self._get_next() + return n[1] + + def __bool__(self) -> bool: + return not self._done + + # Only used by kunit_tool_test.py. + def __iter__(self) -> Iterator[str]: + while bool(self): + yield self.pop() + + def line_number(self) -> int: + return self._next[0] + kunit_start_re = re.compile(r'TAP version [0-9]+$') kunit_end_re = re.compile('(List of all partitions:|' - 'Kernel panic - not syncing: VFS:)') - -def isolate_kunit_output(kernel_output) -> Iterator[str]: - started = False - for line in kernel_output: - line = line.rstrip() # line always has a trailing \n - if kunit_start_re.search(line): - prefix_len = len(line.split('TAP version')[0]) - started = True - yield line[prefix_len:] if prefix_len > 0 else line - elif kunit_end_re.search(line): - break - elif started: - yield line[prefix_len:] if prefix_len > 0 else line + 'Kernel panic - not syncing: VFS:|reboot: System halted)') + +def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: + def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]: + line_num = 0 + started = False + for line in kernel_output: + line_num += 1 + line = line.rstrip() # line always has a trailing \n + if kunit_start_re.search(line): + prefix_len = len(line.split('TAP version')[0]) + started = True + yield line_num, line[prefix_len:] + elif kunit_end_re.search(line): + break + elif started: + yield line_num, line[prefix_len:] + return LineStream(lines=isolate_kunit_output(kernel_output)) def raw_output(kernel_output) -> None: for line in kernel_output: @@ -97,34 +139,40 @@ def print_log(log) -> None: TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$') -def consume_non_diagnostic(lines: List[str]) -> None: - while lines and not TAP_ENTRIES.match(lines[0]): - lines.pop(0) +def consume_non_diagnostic(lines: LineStream) -> None: + while lines and not TAP_ENTRIES.match(lines.peek()): + lines.pop() -def save_non_diagnostic(lines: List[str], test_case: TestCase) -> None: - while lines and not TAP_ENTRIES.match(lines[0]): - test_case.log.append(lines[0]) - lines.pop(0) +def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None: + while lines and not TAP_ENTRIES.match(lines.peek()): + test_case.log.append(lines.peek()) + lines.pop() OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text']) +OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$') + OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$') OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$') -def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool: +def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool: save_non_diagnostic(lines, test_case) if not lines: test_case.status = TestStatus.TEST_CRASHED return True - line = lines[0] + line = lines.peek() match = OK_NOT_OK_SUBTEST.match(line) while not match and lines: - line = lines.pop(0) + line = lines.pop() match = OK_NOT_OK_SUBTEST.match(line) if match: - test_case.log.append(lines.pop(0)) + test_case.log.append(lines.pop()) test_case.name = match.group(2) + skip_match = OK_NOT_OK_SKIP.match(line) + if skip_match: + test_case.status = TestStatus.SKIPPED + return True if test_case.status == TestStatus.TEST_CRASHED: return True if match.group(1) == 'ok': @@ -138,14 +186,14 @@ def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool: SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$') DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$') -def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool: +def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool: save_non_diagnostic(lines, test_case) if not lines: return False - line = lines[0] + line = lines.peek() match = SUBTEST_DIAGNOSTIC.match(line) if match: - test_case.log.append(lines.pop(0)) + test_case.log.append(lines.pop()) crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line) if crash_match: test_case.status = TestStatus.TEST_CRASHED @@ -153,7 +201,7 @@ def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool: else: return False -def parse_test_case(lines: List[str]) -> Optional[TestCase]: +def parse_test_case(lines: LineStream) -> Optional[TestCase]: test_case = TestCase() save_non_diagnostic(lines, test_case) while parse_diagnostic(lines, test_case): @@ -165,55 +213,58 @@ def parse_test_case(lines: List[str]) -> Optional[TestCase]: SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$') -def parse_subtest_header(lines: List[str]) -> Optional[str]: +def parse_subtest_header(lines: LineStream) -> Optional[str]: consume_non_diagnostic(lines) if not lines: return None - match = SUBTEST_HEADER.match(lines[0]) + match = SUBTEST_HEADER.match(lines.peek()) if match: - lines.pop(0) + lines.pop() return match.group(1) else: return None SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)') -def parse_subtest_plan(lines: List[str]) -> Optional[int]: +def parse_subtest_plan(lines: LineStream) -> Optional[int]: consume_non_diagnostic(lines) - match = SUBTEST_PLAN.match(lines[0]) + match = SUBTEST_PLAN.match(lines.peek()) if match: - lines.pop(0) + lines.pop() return int(match.group(1)) else: return None def max_status(left: TestStatus, right: TestStatus) -> TestStatus: - if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED: + if left == right: + return left + elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED: return TestStatus.TEST_CRASHED elif left == TestStatus.FAILURE or right == TestStatus.FAILURE: return TestStatus.FAILURE - elif left != TestStatus.SUCCESS: - return left - elif right != TestStatus.SUCCESS: + elif left == TestStatus.SKIPPED: return right else: - return TestStatus.SUCCESS + return left -def parse_ok_not_ok_test_suite(lines: List[str], +def parse_ok_not_ok_test_suite(lines: LineStream, test_suite: TestSuite, expected_suite_index: int) -> bool: consume_non_diagnostic(lines) if not lines: test_suite.status = TestStatus.TEST_CRASHED return False - line = lines[0] + line = lines.peek() match = OK_NOT_OK_MODULE.match(line) if match: - lines.pop(0) + lines.pop() if match.group(1) == 'ok': test_suite.status = TestStatus.SUCCESS else: test_suite.status = TestStatus.FAILURE + skip_match = OK_NOT_OK_SKIP.match(line) + if skip_match: + test_suite.status = TestStatus.SKIPPED suite_index = int(match.group(2)) if suite_index != expected_suite_index: print_with_timestamp( @@ -224,14 +275,14 @@ def parse_ok_not_ok_test_suite(lines: List[str], else: return False -def bubble_up_errors(statuses: Iterable[TestStatus]) -> TestStatus: - return reduce(max_status, statuses, TestStatus.SUCCESS) +def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus: + return reduce(max_status, status_list, TestStatus.SKIPPED) def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus: max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases) return max_status(max_test_case_status, test_suite.status) -def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[TestSuite]: +def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]: if not lines: return None consume_non_diagnostic(lines) @@ -257,26 +308,26 @@ def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[Te print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token') return test_suite else: - print('failed to parse end of suite' + lines[0]) + print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}') return None TAP_HEADER = re.compile(r'^TAP version 14$') -def parse_tap_header(lines: List[str]) -> bool: +def parse_tap_header(lines: LineStream) -> bool: consume_non_diagnostic(lines) - if TAP_HEADER.match(lines[0]): - lines.pop(0) + if TAP_HEADER.match(lines.peek()): + lines.pop() return True else: return False TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)') -def parse_test_plan(lines: List[str]) -> Optional[int]: +def parse_test_plan(lines: LineStream) -> Optional[int]: consume_non_diagnostic(lines) - match = TEST_PLAN.match(lines[0]) + match = TEST_PLAN.match(lines.peek()) if match: - lines.pop(0) + lines.pop() return int(match.group(1)) else: return None @@ -284,7 +335,7 @@ def parse_test_plan(lines: List[str]) -> Optional[int]: def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus: return bubble_up_errors(x.status for x in test_suites) -def parse_test_result(lines: List[str]) -> TestResult: +def parse_test_result(lines: LineStream) -> TestResult: consume_non_diagnostic(lines) if not lines or not parse_tap_header(lines): return TestResult(TestStatus.NO_TESTS, [], lines) @@ -311,49 +362,69 @@ def parse_test_result(lines: List[str]) -> TestResult: else: return TestResult(TestStatus.NO_TESTS, [], lines) -def print_and_count_results(test_result: TestResult) -> Tuple[int, int, int]: - total_tests = 0 - failed_tests = 0 - crashed_tests = 0 +class TestCounts: + passed: int + failed: int + crashed: int + skipped: int + + def __init__(self): + self.passed = 0 + self.failed = 0 + self.crashed = 0 + self.skipped = 0 + + def total(self) -> int: + return self.passed + self.failed + self.crashed + self.skipped + +def print_and_count_results(test_result: TestResult) -> TestCounts: + counts = TestCounts() for test_suite in test_result.suites: if test_suite.status == TestStatus.SUCCESS: print_suite_divider(green('[PASSED] ') + test_suite.name) + elif test_suite.status == TestStatus.SKIPPED: + print_suite_divider(yellow('[SKIPPED] ') + test_suite.name) elif test_suite.status == TestStatus.TEST_CRASHED: print_suite_divider(red('[CRASHED] ' + test_suite.name)) else: print_suite_divider(red('[FAILED] ') + test_suite.name) for test_case in test_suite.cases: - total_tests += 1 if test_case.status == TestStatus.SUCCESS: + counts.passed += 1 print_with_timestamp(green('[PASSED] ') + test_case.name) + elif test_case.status == TestStatus.SKIPPED: + counts.skipped += 1 + print_with_timestamp(yellow('[SKIPPED] ') + test_case.name) elif test_case.status == TestStatus.TEST_CRASHED: - crashed_tests += 1 + counts.crashed += 1 print_with_timestamp(red('[CRASHED] ' + test_case.name)) print_log(map(yellow, test_case.log)) print_with_timestamp('') else: - failed_tests += 1 + counts.failed += 1 print_with_timestamp(red('[FAILED] ') + test_case.name) print_log(map(yellow, test_case.log)) print_with_timestamp('') - return total_tests, failed_tests, crashed_tests + return counts -def parse_run_tests(kernel_output) -> TestResult: - total_tests = 0 - failed_tests = 0 - crashed_tests = 0 - test_result = parse_test_result(list(isolate_kunit_output(kernel_output))) +def parse_run_tests(kernel_output: Iterable[str]) -> TestResult: + counts = TestCounts() + lines = extract_tap_lines(kernel_output) + test_result = parse_test_result(lines) if test_result.status == TestStatus.NO_TESTS: print(red('[ERROR] ') + yellow('no tests run!')) elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS: print(red('[ERROR] ') + yellow('could not parse test results!')) else: - (total_tests, - failed_tests, - crashed_tests) = print_and_count_results(test_result) + counts = print_and_count_results(test_result) print_with_timestamp(DIVIDER) - fmt = green if test_result.status == TestStatus.SUCCESS else red + if test_result.status == TestStatus.SUCCESS: + fmt = green + elif test_result.status == TestStatus.SKIPPED: + fmt = yellow + else: + fmt =red print_with_timestamp( - fmt('Testing complete. %d tests run. %d failed. %d crashed.' % - (total_tests, failed_tests, crashed_tests))) + fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' % + (counts.total(), counts.failed, counts.crashed, counts.skipped))) return test_result |