diff --git a/fuzzer.py b/fuzzer.py index 6991bee..5888e8a 100644 --- a/fuzzer.py +++ b/fuzzer.py @@ -84,7 +84,7 @@ def random_params(arguments: list[Arg]) -> Params: pools: dict[tuple, set[tuple]] = {} -def get_pool(arguments: list[Arg]) -> set[Params]: +def get_pool(arguments: list[Arg]) -> list[Params]: arg_types = tuple([arg_type for _, arg_type in arguments]) arg_names = [arg_name for arg_name, _ in arguments] @@ -103,7 +103,7 @@ def get_pool(arguments: list[Arg]) -> set[Params]: pools[arg_types] = new_pool - return set([frozendict({arg_names[i]: p for i, p in enumerate(param)}) for param in pools[arg_types]]) + return [frozendict({arg_names[i]: p for i, p in enumerate(param)}) for param in pools[arg_types]] def mutate(test_case: Params, arguments: list[Arg]) -> Params: @@ -132,7 +132,7 @@ def crossover(chosen_test: Params, other_chosen_test: Params, arguments: list[Ar def generate_test_case(f_name: str, arguments: list[Arg]) -> Params: - pool: set[Params] = get_pool(arguments) + pool: list[Params] = get_pool(arguments) while True: test = sample(pool, 1)[0] diff --git a/genetic.py b/genetic.py index 8cd2a97..886edc1 100644 --- a/genetic.py +++ b/genetic.py @@ -1,6 +1,8 @@ +import argparse import os import random import sys +from functools import partial import frozendict import tqdm @@ -8,6 +10,7 @@ from deap import creator, base, tools, algorithms import fuzzer import instrument +import operators from fuzzer import generate_test_case, get_test_class INDMUPROB = 0.05 @@ -18,11 +21,32 @@ NPOP = 1000 NGEN = 200 REPS = 10 -to_test: str = "" - OUT_DIR = os.path.join(os.path.dirname(__file__), "tests") +class Archive: + true_branches: dict[int, any] + false_branches: dict[int, any] + + def __init__(self): + self.reset() + + def reset(self): + self.true_branches = {} + self.false_branches = {} + + def branches_covered(self) -> int: + return len(self.true_branches.keys()) + len(self.false_branches.keys()) + + def branches_str(self) -> str: + branch_ids = sorted([f"{branch:2d}T" for branch in self.true_branches.keys()] + + [f"{branch:2d}F" for branch in self.false_branches.keys()]) + return ' '.join([branch.strip() for branch in branch_ids]) + + def build_suite(self) -> set[instrument.Params]: + return set(list(self.true_branches.values()) + list(self.false_branches.values())) + + def normalize(x): return x / (1.0 + x) @@ -32,25 +56,19 @@ def init_deap(): creator.create("Individual", list, fitness=creator.Fitness) -def taken_branches_descriptor() -> str: - branch_ids = sorted([f"{branch:2d}T" for branch in instrument.archive_true_branches.keys()] + - [f"{branch:2d}F" for branch in instrument.archive_false_branches.keys()]) - return ' '.join([branch.strip() for branch in branch_ids]) - - def generate(f_name: str): - global to_test - to_test = f_name - orig_name = instrument.BranchTransformer.to_original_name(f_name) - args = instrument.functions[f_name] + range_start, range_end = instrument.n_of_branches[f_name] + total_branches = (range_end - range_start) * 2 # *2 because of True and False + archive = Archive() + toolbox = base.Toolbox() toolbox.register("attr_test_case", lambda: list(generate_test_case(f_name, args).items())) toolbox.register("individual", tools.initIterate, creator.Individual, lambda: toolbox.attr_test_case()) toolbox.register("population", tools.initRepeat, list, toolbox.individual) - toolbox.register("evaluate", compute_fitness) + toolbox.register("evaluate", partial(compute_fitness, f_name, archive)) def mate(tc1, tc2): t1, t2 = frozendict.frozendict(tc1), frozendict.frozendict(tc2) @@ -71,44 +89,33 @@ def generate(f_name: str): top_result = set() top_coverage = 0 - range_start, range_end = instrument.n_of_branches[to_test] - total_branches = (range_end - range_start) * 2 # *2 because of True and False - - coverage = [] for i in range(REPS): - instrument.archive_true_branches = {} - instrument.archive_false_branches = {} + archive.reset() population = toolbox.population(n=NPOP) algorithms.eaSimple(population, toolbox, CXPROB, MUPROB, NGEN, verbose=False) - true_covered = len(instrument.archive_true_branches.keys()) - false_covered = len(instrument.archive_false_branches.keys()) - tot_covered = true_covered + false_covered + tot_covered = archive.branches_covered() cov: float = (tot_covered / total_branches) * 100 - coverage.append(cov) - branches = taken_branches_descriptor() + branches = archive.branches_str() print(f"{orig_name}: rep #{i:02d}: Cov: {cov:02.02f}% ({tot_covered}/{total_branches} branches): {branches}") if cov > top_coverage: - top_result = set(list(instrument.archive_true_branches.values()) + - list(instrument.archive_false_branches.values())) + top_result = archive.build_suite() top_coverage = cov - print(coverage) - return top_result -def compute_fitness(individual: list) -> tuple[float]: +def compute_fitness(f_name: str, archive: Archive, individual: list) -> tuple[float]: x = frozendict.frozendict(individual) - range_start, range_end = instrument.n_of_branches[to_test] + range_start, range_end = instrument.n_of_branches[f_name] # Reset any distance values from previous executions - instrument.distances_true = {} - instrument.distances_false = {} + operators.distances_true = {} + operators.distances_false = {} # the archive_true_branches and archive_false_branches are reset after # each generation. This is intentional as they are used to archive branches that @@ -117,7 +124,7 @@ def compute_fitness(individual: list) -> tuple[float]: # Run the function under test try: - out = instrument.invoke(to_test, x) + instrument.invoke(f_name, x) except AssertionError: # print(to_test, x, "=", "[FAILS] fitness = 100.0") return 100.0, @@ -126,18 +133,18 @@ def compute_fitness(individual: list) -> tuple[float]: # Sum up branch distances for branch in range(range_start, range_end): - if branch in instrument.distances_true: - if instrument.distances_true[branch] == 0 and branch not in instrument.archive_true_branches: - instrument.archive_true_branches[branch] = x - if branch not in instrument.archive_true_branches: - fitness += normalize(instrument.distances_true[branch]) + if branch in operators.distances_true: + if operators.distances_true[branch] == 0 and branch not in archive.true_branches: + archive.true_branches[branch] = x + if branch not in archive.true_branches: + fitness += normalize(operators.distances_true[branch]) for branch in range(range_start, range_end): - if branch in instrument.distances_false: - if instrument.distances_false[branch] == 0 and branch not in instrument.archive_false_branches: - instrument.archive_false_branches[branch] = x - if branch not in instrument.archive_false_branches: - fitness += normalize(instrument.distances_false[branch]) + if branch in operators.distances_false: + if operators.distances_false[branch] == 0 and branch not in archive.false_branches: + archive.false_branches[branch] = x + if branch not in archive.false_branches: + fitness += normalize(operators.distances_false[branch]) # print(to_test, x, "=", out, "fitness =", fitness) return fitness, @@ -146,13 +153,20 @@ def compute_fitness(individual: list) -> tuple[float]: def build_suite(f_name: str): instr_name = instrument.BranchTransformer.to_instrumented_name(f_name) cases = generate(instr_name) - with open(os.path.join(OUT_DIR, f_name + ".py"), "w") as f: + with open(os.path.join(OUT_DIR, "test_" + f_name + ".py"), "w") as f: f.write(get_test_class(instr_name, cases)) def main(): random.seed(0) # init random seed - instrument.load_benchmark(save_instrumented=False) # instrument all files in benchmark + + parser = argparse.ArgumentParser(prog='genetic.py', + description='Runs genetic algorithm for test case generation. Works on benchmark ' + 'files situated in the \'benchmark\' directory.') + parser.add_argument('file', type=str, help="File to test", + nargs="*") + + instrument.load_benchmark(save_instrumented=False, files=parser.parse_args().file) init_deap() for instr_f in tqdm.tqdm(sorted(instrument.functions.keys()), desc="Generating tests"): diff --git a/instrument.py b/instrument.py index 3dd4193..d62fad5 100644 --- a/instrument.py +++ b/instrument.py @@ -1,28 +1,18 @@ -from typing import Optional +import ast import os.path +import sys +from typing import Optional +import astunparse import tqdm from frozendict import frozendict +from operators import evaluate_condition -import ast -import astunparse -import sys -import random -from operators import compute_distances - -# hyperparameters ROOT_DIR: str = os.path.dirname(__file__) IN_DIR: str = os.path.join(ROOT_DIR, 'benchmark') OUT_DIR: str = os.path.join(ROOT_DIR, 'instrumented') SUFFIX: str = "_instrumented" -distances_true: dict[int, int] = {} -distances_false: dict[int, int] = {} - -# Archive of solutions -archive_true_branches: dict[int, any] = {} -archive_false_branches: dict[int, any] = {} - class BranchTransformer(ast.NodeTransformer): branches_range: dict[str, tuple[int, int]] @@ -78,7 +68,7 @@ class BranchTransformer(ast.NodeTransformer): return self.generic_visit(ast_node) self.branch_num += 1 - return ast.Call(func=ast.Name("evaluate_condition", ast.Load()), + return ast.Call(func=ast.Name(evaluate_condition.__name__, ast.Load()), args=[ast.Num(self.branch_num), ast.Str(ast_node.ops[0].__class__.__name__), ast_node.left, @@ -88,41 +78,6 @@ class BranchTransformer(ast.NodeTransformer): kwargs=None) -def update_maps(condition_num, d_true, d_false): - global distances_true, distances_false - - if condition_num in distances_true.keys(): - distances_true[condition_num] = min(distances_true[condition_num], d_true) - else: - distances_true[condition_num] = d_true - - if condition_num in distances_false.keys(): - distances_false[condition_num] = min(distances_false[condition_num], d_false) - else: - distances_false[condition_num] = d_false - - -def evaluate_condition(num, op, lhs, rhs): # type: ignore - if op == "In": - if isinstance(lhs, str): - lhs = ord(lhs) - - minimum = sys.maxsize - for elem in rhs.keys(): - distance = abs(lhs - ord(elem)) - if distance < minimum: - minimum = distance - - distance_true, distance_false = minimum, 1 if minimum == 0 else 0 - else: - distance_true, distance_false = compute_distances(op, lhs, rhs) - - update_maps(num, distance_true, distance_false) - - # distance == 0 equivalent to actual test by construction - return distance_true == 0 - - ArgType = str Arg = tuple[str, ArgType] Params = frozendict[str, any] @@ -203,9 +158,14 @@ def find_py_files(search_dir: str): yield os.path.join(cwd, file) -def load_benchmark(save_instrumented=True): +def load_benchmark(save_instrumented=True, files: list[str] = ()): + to_load = set([os.path.splitext(os.path.basename(file))[0] + ".py" for file in files]) + do_all = len(to_load) == 0 + for file in tqdm.tqdm(find_py_files(IN_DIR), desc="Instrumenting"): - instrument(file, os.path.join(OUT_DIR, os.path.basename(file)), save_instrumented=save_instrumented) + filename = os.path.basename(file) + if do_all or filename in to_load: + instrument(file, os.path.join(OUT_DIR, filename), save_instrumented=save_instrumented) def call_statement(f_name: str, f_args: Params) -> str: diff --git a/operators.py b/operators.py index fce8d76..f2af5fd 100644 --- a/operators.py +++ b/operators.py @@ -1,9 +1,13 @@ +import sys from dataclasses import dataclass -from typing import TypeVar, Callable from typing import Generic +from typing import TypeVar, Callable from nltk import edit_distance +distances_true: dict[int, int] = {} +distances_false: dict[int, int] = {} + T = TypeVar('T') U = TypeVar('U') @@ -100,7 +104,7 @@ def str_check(a: any, b: any) -> bool: return type(a) == str and type(b) == str -def compute_distances(name: str, lhs: any, rhs: any) -> tuple[int, int]: +def compute_distances(name: str, lhs: any, rhs: any) -> tuple[int, int, bool]: if int_str_check(lhs, rhs): lhs_int = int_str_convert(lhs) rhs_int = int_str_convert(rhs) @@ -109,13 +113,50 @@ def compute_distances(name: str, lhs: any, rhs: any) -> tuple[int, int]: raise ValueError(f"'{name}' is not a valid CmpOp name for 'int_str' operators") op = int_str_by_name[name] - return op.true_dist(lhs_int, rhs_int), op.false_dist(lhs_int, rhs_int) + return op.true_dist(lhs_int, rhs_int), op.false_dist(lhs_int, rhs_int), op.test(lhs_int, rhs_int) if str_check(lhs, rhs): if name not in str_by_name: raise ValueError(f"'{name}' is not a valid CmpOp name for 'str' operators") op = str_by_name[name] - return op.true_dist(lhs, rhs), op.false_dist(lhs, rhs) + return op.true_dist(lhs, rhs), op.false_dist(lhs, rhs), op.test(lhs, rhs) raise ValueError(f"'{lhs}' and '{rhs}' are not suitable for both 'int_str' and 'str' operators") + + +def update_map(the_map: dict[int, int], condition_num: int, distance: int): + if condition_num in the_map.keys(): + the_map[condition_num] = min(the_map[condition_num], distance) + else: + the_map[condition_num] = distance + + +def update_maps(condition_num, d_true, d_false): + global distances_true, distances_false + update_map(distances_true, condition_num, d_true) + update_map(distances_false, condition_num, d_false) + + +def in_op(num, lhs, rhs): + if isinstance(lhs, str): + lhs = ord(lhs) + + minimum = sys.maxsize + for elem in rhs.keys(): + distance = abs(lhs - ord(elem)) + if distance < minimum: + minimum = distance + + distance_true, distance_false = minimum, 1 if minimum == 0 else 0 + update_maps(num, distance_true, distance_false) + return distance_true == 0 # distance == 0 equivalent to actual test by construction + + +def evaluate_condition(num, op, lhs, rhs): + if op == "In": + return in_op(num, lhs, rhs) + + distance_true, distance_false, test = compute_distances(op, lhs, rhs) + update_maps(num, distance_true, distance_false) + return test diff --git a/tests/output.txt b/tests/output.txt index 1004ff2..d2f5e0c 100644 --- a/tests/output.txt +++ b/tests/output.txt @@ -1,6 +1,3 @@ -/usr/local/bin/python3.10 /Volumes/Data/git/kse/project-02-python-test-generator-maggicl/genetic.py -Instrumenting: 10it [00:00, 722.61it/s] -Generating tests: 0%| | 0/12 [00:00