basic functionallity

This commit is contained in:
2024-02-04 09:28:07 +01:00
parent 0bdd094e17
commit e677575530
6 changed files with 279 additions and 0 deletions

119
src/testing.py Normal file
View File

@@ -0,0 +1,119 @@
import functools
import multiprocessing
from typing import Callable, Iterator
class TimeoutException(Exception):
pass
def get_subtasks_for(cls: type) -> Iterator[type]:
"""get all tasks in a tasks aka subtasks"""
return [e for e in cls.__dict__.values() if hasattr(e, 'is_task')]
def get_tests_for(cls: type) -> Iterator[Callable[[], None]]:
"""get all functions that are declared as tests"""
return [e for e in cls.__dict__.values() if hasattr(e, 'is_test')]
def test_wrapper(test: Callable[[], None]):
try:
test()
except Exception as e:
test.has_failed = True
test.cause = e
def run_test(test: Callable[[], None]):
"""run a test and catch any unexpected behavior and mark it as failed with the exception as cause"""
if not hasattr(test, 'is_test'):
return
timeout = test.timeout
p = multiprocessing.Process(target=test_wrapper, args=[test])
p.start()
p.join(timeout)
if p.is_alive():
test.has_failed = True
test.cause = TimeoutException(f"test failed after {timeout} seconds")
def run_tests_for_task(task: type) -> None:
for test in task.tests:
run_test(test)
for task in task.tasks:
run_tests_for_task(task)
def points_to_detuct(e: type | Callable) -> int:
match e:
case type() if hasattr(e, 'is_task'):
to_detuct = 0
for test in e.tests:
if hasattr(test, 'is_test'):
to_detuct += test.to_deduct()
for task in e.tasks:
to_detuct += points_to_detuct(task)
return e.max_points if to_detuct > e.max_points else to_detuct
case Callable(test):
return test.to_detuct()
case _:
return 0
class Exercise(object):
def __init__(self, id: str) -> None:
self.__tasks: list[type] = []
self.__id = id
self.__max_points = 0
self.__points = 0
def register(self, cls: type) -> None:
if hasattr(cls, 'is_task'):
self.__tasks.append(cls)
self.__max_points += cls.max_points
return cls
def run(self) -> str:
self.run_tests()
self.deduct_points()
def run_tests(self):
for task in self.__tasks:
run_tests_for_task(task)
def deduct_points(self):
for task in self.__tasks:
to_detuct = points_to_detuct(task)
task.points = 0 if to_detuct > task.max_points else task.max_points - to_detuct
self.__points = functools.reduce(
lambda a, b: a + b, map(lambda t: t.points, self.__tasks), 0.0)
def get_points(self) -> float:
return self.__points
def get_max_points(self) -> float:
return self.__max_points
def eip_task(header: str, max_points: float, ex: Exercise) -> Callable[[type], type]:
def wrapper(cls: type) -> type:
cls.is_task = True
cls.header = header
cls.max_points = max_points
cls.tasks = get_subtasks_for(cls)
cls.tests = get_tests_for(cls)
ex.register(cls)
return cls
return wrapper
def eip_test(msg: str, to_deduct: float, timeout = 10) -> Callable[[], None]:
def wrapper(test: Callable[[], None]) -> Callable[[], None]:
test.is_test = True
test.msg = msg
test.has_failed = False
test.to_deduct = lambda: to_deduct if test.has_failed else 0
test.timeout = timeout
test.cause = None
return test
return wrapper