This commit is contained in:
Claudio Maggioni 2023-12-20 14:19:45 +01:00
parent a70deaf845
commit 27438c280f
11 changed files with 183 additions and 124 deletions

1
.gitignore vendored
View file

@ -2,6 +2,7 @@
__pycache__/
*.py[cod]
*$py.class
.mutmut-cache
# C extensions
*.so

View file

@ -16,7 +16,7 @@ Note: Feel free to modify this file according to the project's necessities.
To install the required dependencies make sure `python3` points to a Python 3.10 or 3.11 installation and then run:
```shell
python3 -m venv env
python3.8 -m venv env
source env/bin/activate
pip install -r requirements.txt
```

67
archive.py Normal file
View file

@ -0,0 +1,67 @@
from typing import Optional, Dict, Set
from frozendict import frozendict
import instrument
import operators
class Archive:
true_branches: Dict[int, any]
false_branches: Dict[int, any]
false_score: Dict[int, any]
true_score: Dict[int, any]
f_name: str
def __init__(self, f_name: str) -> None:
self.reset()
self.f_name = f_name
def reset(self):
self.true_branches = {}
self.false_branches = {}
self.true_score = {}
self.false_score = {}
def branches_covered(self) -> int:
return len(self.true_branches.keys()) + len(self.false_branches.keys())
def branches_str(self) -> str:
branch_ids = sorted([f"{branch:2d}T" for branch in self.true_branches.keys()] +
[f"{branch:2d}F" for branch in self.false_branches.keys()])
return ' '.join([branch.strip() for branch in branch_ids])
def build_suite(self) -> Set[instrument.Params]:
return set(list(self.true_branches.values()) + list(self.false_branches.values()))
def consider_test(self, test_case: frozendict):
instrument.invoke(self.f_name, test_case)
range_start, range_end = instrument.n_of_branches[self.f_name]
for branch in range(range_start, range_end):
if (branch in operators.distances_true and
operators.distances_true[branch] == 0 and branch not in self.true_branches):
self.true_branches[branch] = test_case
if (branch in operators.distances_false and
operators.distances_false[branch] == 0 and branch not in self.false_branches):
self.false_branches[branch] = test_case
def satisfies_branch(self, test_case: frozendict) -> Optional[str]:
try:
instrument.invoke(self.f_name, test_case)
except AssertionError:
return None
range_start, range_end = instrument.n_of_branches[self.f_name]
for branch in range(range_start, range_end):
if (branch in operators.distances_true and
operators.distances_true[branch] == 0 and
branch not in self.true_branches):
return f"{branch}T"
if (branch in operators.distances_false and
operators.distances_false[branch] == 0 and
branch not in self.false_branches):
return f"{branch}F"
return None

View file

@ -4,9 +4,12 @@ from random import randrange, choice, random, sample
from frozendict import frozendict
import operators
from archive import Archive
from instrument import Arg, Params, invoke, call_statement, BranchTransformer, module_of
Range = tuple[int, int]
from typing import Tuple, Dict, List, Set
Range = Tuple[int, int]
INT_RANGE: Range = (-1000, 1000)
STRING_LEN_RANGE: Range = (0, 10)
@ -30,7 +33,7 @@ def random_str() -> str:
return "".join([random_chr() for _ in range(length)])
def max_cases(args: list[Arg]) -> int:
def max_cases(args: List[Arg]) -> int:
num = 1
for _, arg_type in args:
if arg_type == 'int':
@ -73,8 +76,8 @@ def random_mutate(arg_type: str, arg_value: any) -> any:
raise ValueError(f"Arg type '{arg_type}' not supported")
def random_params(arguments: list[Arg]) -> Params:
test_input: dict[str, any] = {}
def random_params(arguments: List[Arg]) -> Params:
test_input: Dict[str, any] = {}
for arg_name, arg_type in arguments:
test_input[arg_name] = random_arg(arg_type)
@ -82,10 +85,10 @@ def random_params(arguments: list[Arg]) -> Params:
return frozendict(test_input)
pools: dict[tuple, set[tuple]] = {}
pools: Dict[tuple, Set[tuple]] = {}
def get_pool(arguments: list[Arg]) -> list[Params]:
def get_pool(arguments: List[Arg]) -> List[Params]:
arg_types = tuple([arg_type for _, arg_type in arguments])
arg_names = [arg_name for arg_name, _ in arguments]
@ -94,7 +97,7 @@ def get_pool(arguments: list[Arg]) -> list[Params]:
if arg_types not in pools:
new_pool = set()
for _ in range(POOL_SIZE):
param_list: list[any] = [None] * len(arg_names)
param_list: List[any] = [None] * len(arg_names)
params = random_params(arguments)
for i, name in enumerate(arg_names):
@ -107,16 +110,16 @@ def get_pool(arguments: list[Arg]) -> list[Params]:
return [frozendict({arg_names[i]: p for i, p in enumerate(param)}) for param in pools[arg_types]]
def mutate(test_case: Params, arguments: list[Arg]) -> Params:
def mutate(test_case: Params, arguments: List[Arg]) -> Params:
arg_name = choice(list(test_case.keys())) # choose name to mutate
types: dict[str, str] = {arg_name: arg_type for arg_name, arg_type in arguments}
types: Dict[str, str] = {arg_name: arg_type for arg_name, arg_type in arguments}
return test_case.set(arg_name, random_mutate(types[arg_name], test_case[arg_name]))
def crossover(chosen_test: Params, other_chosen_test: Params, arguments: list[Arg]) -> tuple[Params, Params]:
def crossover(chosen_test: Params, other_chosen_test: Params, arguments: List[Arg]) -> Tuple[Params, Params]:
# Select a property at random and swap properties
arg_name = choice(list(chosen_test.keys()))
types: dict[str, str] = {arg_name: arg_type for arg_name, arg_type in arguments}
types: Dict[str, str] = {arg_name: arg_type for arg_name, arg_type in arguments}
if types[arg_name] == 'str':
# Crossover for strings intermingles the strings of the two chosen tests
s1, s2 = str_crossover(chosen_test[arg_name], other_chosen_test[arg_name])
@ -132,11 +135,18 @@ def crossover(chosen_test: Params, other_chosen_test: Params, arguments: list[Ar
return t1, t2
def generate_test_case(f_name: str, arguments: list[Arg]) -> Params:
pool: list[Params] = get_pool(arguments)
def generate_test_case(f_name: str, arguments: List[Arg], archive: Archive) -> Params:
pool: List[Params] = get_pool(arguments)
while True:
test = sample(pool, 1)[0]
is_new = archive.satisfies_branch(test)
if is_new is None:
# print(f"Not new: {test}")
continue
print(f"New {is_new}!: {test}")
try:
invoke(f_name, test)
@ -172,7 +182,7 @@ def get_test_case_source(f_name: str, test_case: Params, i: int, indent: int):
{space}{single_indent}assert {call_statement(f_name_orig, test_case)} == {repr(output)}"""
def get_test_import_stmt(names: list[str]):
def get_test_import_stmt(names: List[str]):
imports = ["from unittest import TestCase"]
for orig_f_name in names:
@ -182,7 +192,7 @@ def get_test_import_stmt(names: list[str]):
return "\n".join(imports) + "\n"
def get_test_class(orig_f_name: str, cases: set[Params]) -> str:
def get_test_class(orig_f_name: str, cases: Set[Params]) -> str:
f_name = BranchTransformer.to_instrumented_name(orig_f_name)
return (f"class Test_{orig_f_name}(TestCase):\n" +
"\n\n".join([get_test_case_source(f_name, case, i + 1, 1) for i, case in enumerate(cases)]) +

View file

@ -2,6 +2,7 @@ import argparse
import os
import random
from functools import partial
from typing import Tuple, List, Set
import frozendict
import tqdm
@ -11,6 +12,7 @@ import fuzzer
import instrument
import operators
from fuzzer import generate_test_case, get_test_class
from archive import Archive
INDMUPROB = 0.05
MUPROB = 0.33
@ -23,33 +25,6 @@ REPS = 10
OUT_DIR = os.path.join(os.path.dirname(__file__), "tests")
class Archive:
true_branches: dict[int, any]
false_branches: dict[int, any]
false_score: dict[int, any]
true_score: dict[int, any]
def __init__(self):
self.reset()
def reset(self):
self.true_branches = {}
self.false_branches = {}
self.true_score = {}
self.false_score = {}
def branches_covered(self) -> int:
return len(self.true_branches.keys()) + len(self.false_branches.keys())
def branches_str(self) -> str:
branch_ids = sorted([f"{branch:2d}T" for branch in self.true_branches.keys()] +
[f"{branch:2d}F" for branch in self.false_branches.keys()])
return ' '.join([branch.strip() for branch in branch_ids])
def build_suite(self) -> set[instrument.Params]:
return set(list(self.true_branches.values()) + list(self.false_branches.values()))
def normalize(x):
return x / (1.0 + x)
@ -59,16 +34,16 @@ def init_deap():
creator.create("Individual", list, fitness=creator.FitnessMin)
def generate(orig_name: str) -> set[instrument.Params]:
def generate(orig_name: str) -> Set[instrument.Params]:
f_name = instrument.BranchTransformer.to_instrumented_name(orig_name)
args = instrument.functions[f_name]
range_start, range_end = instrument.n_of_branches[f_name]
total_branches = (range_end - range_start) * 2 # *2 because of True and False
archive = Archive()
archive = Archive(f_name)
toolbox = base.Toolbox()
toolbox.register("attr_test_case", lambda: list(generate_test_case(f_name, args).items()))
toolbox.register("attr_test_case", lambda: list(generate_test_case(f_name, args, archive).items()))
toolbox.register("individual", tools.initIterate, creator.Individual, lambda: toolbox.attr_test_case())
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", partial(compute_fitness, f_name, archive))
@ -102,20 +77,15 @@ def generate(orig_name: str) -> set[instrument.Params]:
population, logbook = algorithms.eaSimple(population, toolbox, CXPROB, MUPROB, NGEN, verbose=False, stats=stats)
print("population:\n" + "\n".join([str(p) for p in population]) + "\n")
print("population:\n" +
"\n".join([f"{str(p)} {compute_fitness(f_name, archive, p)[0]}" for p in population]) +
"\n")
for member in population:
m = frozendict.frozendict(member)
for branch in range(range_start, range_end):
if (branch in operators.distances_true and
operators.distances_true[branch] == 0 and branch not in archive.true_branches):
archive.true_branches[branch] = m
if (branch in operators.distances_false and
operators.distances_false[branch] == 0 and branch not in archive.false_branches):
archive.false_branches[branch] = m
archive.consider_test(frozendict.frozendict(member))
# for gen, record in enumerate(logbook):
# print(f"Generation {gen}: min={record['min']} max={record['max']}")
for gen, record in enumerate(logbook):
print(f"Generation {gen}: min={record['min']} max={record['max']}")
tot_covered = archive.branches_covered()
@ -135,19 +105,10 @@ def generate(orig_name: str) -> set[instrument.Params]:
return top_result
def compute_fitness(f_name: str, archive: Archive, individual: list) -> tuple[float]:
def compute_fitness(f_name: str, archive: Archive, individual: list) -> Tuple[float]:
x = frozendict.frozendict(individual)
range_start, range_end = instrument.n_of_branches[f_name]
# Reset any distance values from previous executions
operators.distances_true = {}
operators.distances_false = {}
# the archive_true_branches and archive_false_branches are reset after
# each generation. This is intentional as they are used to archive branches that
# have already been covered, and their presence increases the fitness value of
# test cases that would re-cover them
# Run the function under test
try:
out = instrument.invoke(f_name, x)
@ -156,29 +117,27 @@ def compute_fitness(f_name: str, archive: Archive, individual: list) -> tuple[fl
return 100.0,
fitness = 0.0
#branches = False
# Sum up branch distances
for branch in range(range_start, range_end):
if branch in operators.distances_true:
if branch not in archive.true_branches:
fitness += normalize(operators.distances_true[branch])
#branches = True
else:
fitness += 10
for branch in range(range_start, range_end):
if branch in operators.distances_false:
if branch not in archive.false_branches:
fitness += normalize(operators.distances_false[branch])
#branches = True
#if not branches:
# return 100.0,
else:
fitness += 10
# print(f_name, x, "=", out, "fitness =", fitness)
return fitness,
def build_suite(filename: str, f_names: list[str]):
def build_suite(filename: str, f_names: List[str]):
suite = [(name, generate(name)) for name in f_names]
with open(os.path.join(OUT_DIR, f"test_{filename}.py"), "w") as f:
@ -187,7 +146,7 @@ def build_suite(filename: str, f_names: list[str]):
f.write("\n\n".join([get_test_class(name, cases) for name, cases in suite]))
def run_genetic(files: list[str], seed: int):
def run_genetic(files: List[str], seed: int):
instrument.load_benchmark(save_instrumented=False, files=files)
random.seed(seed) # init random seed
init_deap()

View file

@ -2,12 +2,13 @@ import ast
import os.path
import sys
from collections import defaultdict
from typing import Optional
from typing import Optional, Dict, DefaultDict, Tuple, List
import astunparse
import frozendict
import tqdm
from frozendict import frozendict
import operators
from operators import evaluate_condition
ROOT_DIR: str = os.path.dirname(__file__)
@ -17,7 +18,7 @@ SUFFIX: str = "_instrumented"
class BranchTransformer(ast.NodeTransformer):
branches_range: dict[str, tuple[int, int]]
branches_range: Dict[str, Tuple[int, int]]
branch_num: int
instrumented_name: Optional[str]
in_assert: bool
@ -81,13 +82,13 @@ class BranchTransformer(ast.NodeTransformer):
ArgType = str
Arg = tuple[str, ArgType]
Params = frozendict[str, any]
SignatureDict = dict[str, list[Arg]]
Arg = Tuple[str, ArgType]
Params = frozendict.frozendict[str, any]
SignatureDict = Dict[str, List[Arg]]
n_of_branches: dict[str, tuple[int, int]] = {}
n_of_branches: Dict[str, Tuple[int, int]] = {}
functions: SignatureDict = {}
module_of: dict[str, list[str]] = {}
module_of: Dict[str, List[str]] = {}
def instrument(source_path: str, target_path: str, save_instrumented=True):
@ -116,10 +117,10 @@ def instrument(source_path: str, target_path: str, save_instrumented=True):
# Figure out the top level function definitions
assert isinstance(node, ast.Module)
top_level_f_ast: list[ast.FunctionDef] = [f for f in node.body if isinstance(f, ast.FunctionDef)]
top_level_f_ast: List[ast.FunctionDef] = [f for f in node.body if isinstance(f, ast.FunctionDef)]
for f in top_level_f_ast:
arg_types: list[Arg] = []
arg_types: List[Arg] = []
for arg in f.args.args:
# fetch annotation type if found else fetch none
@ -137,6 +138,9 @@ def invoke(f_name: str, f_args: Params) -> any:
current_module = sys.modules[__name__]
operators.distances_true = {}
operators.distances_false = {}
if f_name not in functions:
raise ValueError(f"Function '{f_name}' not loaded")
@ -155,7 +159,7 @@ def find_py_files(search_dir: str):
yield os.path.join(cwd, file)
def load_benchmark(save_instrumented=True, files: list[str] = ()):
def load_benchmark(save_instrumented=True, files: List[str] = ()):
to_load = set([os.path.splitext(os.path.basename(file))[0] + ".py" for file in files])
do_all = len(to_load) == 0
@ -166,20 +170,20 @@ def load_benchmark(save_instrumented=True, files: list[str] = ()):
def call_statement(f_name: str, f_args: Params) -> str:
arg_list: list[str] = []
arg_list: List[str] = []
for k, v in f_args.items():
arg_list.append(f"{k}={repr(v)}") # quote strings
return f"{f_name}({', '.join(arg_list)})"
def get_benchmark() -> dict[str, list[str]]:
def get_benchmark() -> Dict[str, List[str]]:
"""
Returns a dictionary associated each source code file name loaded (without extension) with the list of
(non-instrumented) function names defined within it
"""
names: defaultdict[str, list[str]] = defaultdict(list)
names: DefaultDict[str, List[str]] = defaultdict(list)
for f in functions:
names[module_of[f][-1]].append(BranchTransformer.to_original_name(f))

View file

@ -7,6 +7,8 @@ import sys
import instrument
from genetic import run_genetic
from mutpy import commandline
ROOT_DIR = os.path.dirname(__file__)
IN_SOURCE_DIR = os.path.join(ROOT_DIR, "benchmark")
IN_TEST_DIR = os.path.join(ROOT_DIR, "tests")
@ -14,13 +16,20 @@ OUT_DIR = os.path.join(ROOT_DIR, "tests")
def run_mutpy(test_path: str, source_path: str):
run_genetic([source_path], random.randint(0, 500))
# run_genetic([source_path], random.randint(0, 500))
stream = os.popen(f'mut.py --target \'{source_path}\' --unit-test \'{test_path}\' -m')
output = stream.read()
score = re.search('Mutation score \\[.*\\]: (\d+\.\d+)\%', output).group(1)
print(output, file=sys.stderr)
print(f"Score is: {score}")
argv = ['-t', source_path, '-u', test_path, '-m']
argv = ['-t', source_path, '-u', test_path, '-m']
cfg = commandline.build_parser().parse_args(args=argv)
mutation_controller = commandline.build_controller(cfg)
mutation_controller.run()
# stream = os.popen(f'mut.py --target \'{source_path}\' --unit-test \'{test_path}\' -m | tee log.txt')
# output = stream.read()
# score = re.search('Mutation score \\[.*\\]: (\d+\.\d+)\%', output).group(1)
# print(output, file=sys.stderr)
# print(f"Score is: {score}")
def main():
@ -41,6 +50,8 @@ def main():
source_path = os.path.join(IN_SOURCE_DIR, f"{filename}.py")
test_path = os.path.join(IN_TEST_DIR, f"test_{filename}.py")
run_mutpy(test_path, source_path)
break
break
if __name__ == "__main__":

View file

@ -1,15 +1,14 @@
import sys
from dataclasses import dataclass
from typing import Generic
from typing import TypeVar, Callable
from typing import Generic, Dict, List, TypeVar, Callable, Union, Tuple
from nltk import edit_distance
distances_true: dict[int, int] = {}
distances_false: dict[int, int] = {}
distances_true: Dict[int, int] = {}
distances_false: Dict[int, int] = {}
distances_true_all: dict[int, list[int]] = {}
distances_false_all: dict[int, list[int]] = {}
distances_true_all: Dict[int, List[int]] = {}
distances_false_all: Dict[int, List[int]] = {}
T = TypeVar('T')
U = TypeVar('U')
@ -33,7 +32,7 @@ class CmpOp(Generic[T]):
# Operands for these must both be integers or strings of length 1
int_str_ops: list[CmpOp[int | str]] = [
int_str_ops: List[CmpOp[Union[int, str]]] = [
CmpOp(operator='<',
name='Lt',
test=lambda lhs, rhs: lhs < rhs,
@ -66,7 +65,7 @@ int_str_ops: list[CmpOp[int | str]] = [
false_dist=lambda lhs, rhs: abs(lhs - rhs)),
]
int_str_by_name: dict[str, CmpOp[int | str]] = {c.name: c for c in int_str_ops}
int_str_by_name: Dict[str, CmpOp[Union[int, str]]] = {c.name: c for c in int_str_ops}
def int_str_check(a: any, b: any) -> bool:
@ -77,7 +76,7 @@ def int_str_check(a: any, b: any) -> bool:
return len(a) == 1 and len(b) == 1
def int_str_convert(x: int | str) -> int:
def int_str_convert(x: Union[int, str]) -> int:
if type(x) == int:
return x
if len(x) == 1:
@ -87,7 +86,7 @@ def int_str_convert(x: int | str) -> int:
# Operands for these must both be strings
str_ops: list[CmpOp[str]] = [
str_ops: List[CmpOp[str]] = [
CmpOp(operator='==',
name='Eq',
test=lambda lhs, rhs: lhs == rhs,
@ -100,14 +99,14 @@ str_ops: list[CmpOp[str]] = [
false_dist=lambda lhs, rhs: edit_distance(lhs, rhs)),
]
str_by_name: dict[str, CmpOp[int | str]] = {c.name: c for c in str_ops}
str_by_name: Dict[str, CmpOp[Union[int, str]]] = {c.name: c for c in str_ops}
def str_check(a: any, b: any) -> bool:
return type(a) == str and type(b) == str
def compute_distances(name: str, lhs: any, rhs: any) -> tuple[int, int, bool]:
def compute_distances(name: str, lhs: any, rhs: any) -> Tuple[int, int, bool]:
if int_str_check(lhs, rhs):
lhs_int = int_str_convert(lhs)
rhs_int = int_str_convert(rhs)
@ -128,7 +127,7 @@ def compute_distances(name: str, lhs: any, rhs: any) -> tuple[int, int, bool]:
raise ValueError(f"'{lhs}' and '{rhs}' are not suitable for both 'int_str' and 'str' operators")
def update_map(the_map: dict[int, int], condition_num: int, distance: int):
def update_map(the_map: Dict[int, int], condition_num: int, distance: int):
if condition_num in the_map:
the_map[condition_num] = min(the_map[condition_num], distance)
else:

5
setup.cfg Normal file
View file

@ -0,0 +1,5 @@
[mutmut]
paths_to_mutate=benchmark/anagram_check.py
backup=False
runner=python -m unittest
tests_dir=tests/

View file

@ -1,19 +1,22 @@
from unittest import TestCase
from benchmark.anagram_check import anagram_check
import inspect
class Test_anagram_check(TestCase):
# distances_true = {1: [0], 2: [1], 3: [0]}
# distances_false = {1: [1], 2: [0], 3: [1]}
def test_anagram_check_1(self):
assert anagram_check(s1='Z', s2='') == False
# distances_true = {1: [0], 2: [0]}
# distances_false = {1: [1], 2: [1]}
def test_anagram_check_2(self):
assert anagram_check(s1='w', s2='t') == False
def test_anagram_check_1(self):
print(f"aaaaa {inspect.getsource(anagram_check)}")
self.assertTrue(anagram_check(s1='e', s2='4') == False)
# distances_true = {1: [1], 3: [0]}
# distances_false = {1: [0], 3: [1]}
def test_anagram_check_2(self):
print(f"aaaaa {inspect.getsource(anagram_check)}")
self.assertTrue(anagram_check(s1='|:', s2=',=U') == False)
# distances_true = {1: [1], 3: [1], 4: [0]}
# distances_false = {1: [0], 3: [0], 4: [1]}
def test_anagram_check_3(self):
assert anagram_check(s1='', s2='f') == False
print(f"aaaaa {inspect.getsource(anagram_check)}")
self.assertTrue(anagram_check(s1='', s2='') == True)

View file

@ -4,14 +4,14 @@ from benchmark.caesar_cipher import decrypt
class Test_encrypt(TestCase):
# distances_true = {}
# distances_false = {}
# distances_true = {1: [16, 0, 17, 0, 0, 41, 31]}
# distances_false = {1: [0, 42, 0, 13, 52, 0, 0]}
def test_encrypt_1(self):
assert encrypt(strng='', key=45) == ''
assert encrypt(strng=';t:W~",', key=52) == 'oIn,SV`'
class Test_decrypt(TestCase):
# distances_true = {2: [0, 211, 211, 0, 0, 196, 15, 221, 189]}
# distances_false = {2: [13, 0, 0, 9, 24, 0, 0, 0, 0]}
# distances_true = {2: [215, 0, 6, 0, 25, 0, 0, 223]}
# distances_false = {2: [0, 6, 0, 18, 0, 19, 27, 0]}
def test_decrypt_1(self):
assert decrypt(strng=']<<aR-xF&', key=74) == 'ròòvgã.üÜ'
assert decrypt(strng="'KV?i>6/", key=49) == 'öy%m8ldþ'