This repository has been archived on 2024-10-22. You can view files and clone it, but cannot push or open issues or pull requests.
kse-02/fuzzer.py

271 lines
9.1 KiB
Python

import argparse
import os
from random import randrange, choice, random, sample, seed
from frozendict import frozendict
from tqdm import tqdm
import instrument
import operators
from archive import Archive
from instrument import (Arg, Params, invoke, call_statement, BranchTransformer,
module_of, load_benchmark, get_benchmark, functions)
from typing import Tuple, Dict, List, Set, Callable
Range = Tuple[int, int]
INT_RANGE: Range = (-1000, 1000)
STRING_LEN_RANGE: Range = (0, 10)
STRING_CHAR_RANGE: Range = (32, 127)
POOL_SIZE: int = 1000
FUZZER_REPS: int = 1000
OUT_DIR = os.path.join(os.path.dirname(__file__), "fuzzer_tests")
def random_int() -> int:
return randrange(INT_RANGE[0], INT_RANGE[1])
def random_chr() -> str:
chr_from, chr_to = STRING_CHAR_RANGE
return chr(randrange(chr_from, chr_to))
def random_str() -> str:
length = randrange(STRING_LEN_RANGE[0], STRING_LEN_RANGE[1])
return "".join([random_chr() for _ in range(length)])
def max_cases(args: List[Arg]) -> int:
num = 1
for _, arg_type in args:
if arg_type == 'int':
num *= (INT_RANGE[1] - INT_RANGE[0])
elif arg_type == 'str':
len_from, len_to = STRING_LEN_RANGE
chr_from, chr_to = STRING_CHAR_RANGE
num *= sum([(chr_to - chr_from) * length * length for length in range(len_from, len_to)])
else:
raise ValueError(f"Arg type '{arg_type}' not supported")
return num
def random_arg(arg_type: str) -> any:
if arg_type == 'str':
return random_str()
elif arg_type == 'int':
return random_int()
else:
raise ValueError(f"Arg type '{arg_type}' not supported")
def random_mutate(arg_type: str, arg_value: any) -> any:
if arg_type == 'str':
if len(arg_value) == 0:
return arg_value
prob = 1.0 / len(arg_value)
for pos in range(len(arg_value)):
if random() < prob:
arg_value = list(arg_value)
arg_value[pos] = random_chr()
arg_value = "".join(arg_value)
return arg_value
elif arg_type == 'int':
delta = randrange(-10, 10)
return arg_value + delta
else:
raise ValueError(f"Arg type '{arg_type}' not supported")
def random_params(arguments: List[Arg]) -> Params:
test_input: Dict[str, any] = {}
for arg_name, arg_type in arguments:
test_input[arg_name] = random_arg(arg_type)
return frozendict(test_input)
pools: Dict[tuple, Set[tuple]] = {}
def add_to_pool(arguments: List[Arg], params: Params):
arg_names = [arg_name for arg_name, _ in arguments]
arg_types = tuple([arg_type for _, arg_type in arguments])
if arg_types not in pools:
raise ValueError(f"{arguments} has no matching pool in pools")
param_list: List[any] = [None] * len(arg_names)
for i, name in enumerate(arg_names):
param_list[i] = params[name]
pools[arg_types].add(tuple(param_list))
def get_pool(arguments: List[Arg]) -> List[Params]:
arg_types = tuple([arg_type for _, arg_type in arguments])
arg_names = [arg_name for arg_name, _ in arguments]
# Generate pool if not generated already
# The pool only remembers the order of parameters and not their names
if arg_types not in pools:
new_pool = set()
for _ in range(POOL_SIZE):
param_list: List[any] = [None] * len(arg_names)
params = random_params(arguments)
for i, name in enumerate(arg_names):
param_list[i] = params[name]
new_pool.add(tuple(param_list))
pools[arg_types] = new_pool
return [frozendict({arg_names[i]: p for i, p in enumerate(param)}) for param in pools[arg_types]]
def mutate(test_case: Params, arguments: List[Arg]) -> Params:
arg_name = choice(list(test_case.keys())) # choose name to mutate
types: Dict[str, str] = {arg_name: arg_type for arg_name, arg_type in arguments}
return test_case.set(arg_name, random_mutate(types[arg_name], test_case[arg_name]))
def crossover(chosen_test: Params, other_chosen_test: Params, arguments: List[Arg]) -> Tuple[Params, Params]:
# Select a property at random and swap properties
arg_name = choice(list(chosen_test.keys()))
types: Dict[str, str] = {arg_name: arg_type for arg_name, arg_type in arguments}
if types[arg_name] == 'str':
# Crossover for strings intermingles the strings of the two chosen tests
s1, s2 = str_crossover(chosen_test[arg_name], other_chosen_test[arg_name])
t1 = chosen_test.set(arg_name, s1)
t2 = other_chosen_test.set(arg_name, s2)
else: # types[arg_name] == 'int'
# Crossover for integers swaps the values from the two tests
i1, i2 = chosen_test[arg_name], other_chosen_test[arg_name]
t1 = chosen_test.set(arg_name, i1)
t2 = other_chosen_test.set(arg_name, i2)
return t1, t2
def generate_test_case(f_name: str, arguments: List[Arg], archive: Archive, bias_unseen=True) -> Params:
pool: List[Params] = get_pool(arguments)
attempts = 20 # attempts to generate a random test that satisfies a new branch
while True:
test = sample(pool, 1)[0]
is_new = [] if not bias_unseen else archive.satisfies_unseen_branches(test)
attempts -= 1
if bias_unseen and len(is_new) == 0 and attempts > 0:
# print(f"Not new: {test}")
continue
try:
invoke(f_name, test)
return test # return only test cases that satisfy assertions
except AssertionError:
pass
def str_crossover(parent1: str, parent2: str):
if len(parent1) > 1 and len(parent2) > 1:
pos = randrange(1, len(parent1))
offspring1 = parent1[:pos] + parent2[pos:]
offspring2 = parent2[:pos] + parent1[pos:]
return offspring1, offspring2
return parent1, parent2
def get_test_case_source(f_name: str, test_case: Params, i: int, indent: int):
f_name_orig = BranchTransformer.to_original_name(f_name)
single_indent = " " * 4
space = single_indent * indent
operators.distances_true_all = {}
operators.distances_false_all = {}
output = invoke(f_name, test_case)
comment = (f"{space}# distances_true = {repr(operators.distances_true_all)}\n"
f"{space}# distances_false = {repr(operators.distances_false_all)}\n")
return f"""{comment}{space}def test_{f_name_orig}_{i}(self):
{space}{single_indent}assert {call_statement(f_name_orig, test_case)} == {repr(output)}"""
def get_test_import_stmt(names: List[str]):
imports = ["from unittest import TestCase"]
for orig_f_name in names:
f_name = BranchTransformer.to_instrumented_name(orig_f_name)
imports.append(f"from {'.'.join(module_of[f_name])} import {orig_f_name}")
return "\n".join(imports) + "\n"
def get_test_class(orig_f_name: str, cases: Set[Params]) -> str:
f_name = BranchTransformer.to_instrumented_name(orig_f_name)
return (f"class Test_{orig_f_name}(TestCase):\n" +
"\n\n".join([get_test_case_source(f_name, case, i + 1, 1) for i, case in enumerate(cases)]) +
"\n")
def generate_tests(files: List[str], seed_num: int, generation_fn: Callable[[str], Set[Params]]):
load_benchmark(save_instrumented=False, files=files)
seed(seed_num) # init random seed
for file_name, f_names in tqdm(get_benchmark().items(), desc="Generating tests"):
suite = [(name, generation_fn(name)) for name in f_names]
with open(os.path.join(OUT_DIR, f"test_{file_name}.py"), "w") as f:
f.write(get_test_import_stmt(f_names))
f.write("\n\n")
f.write("\n\n".join([get_test_class(name, cases) for name, cases in suite]))
def fuzzer_generate(f_name: str) -> Set[Params]:
instrumented = instrument.BranchTransformer.to_instrumented_name(f_name)
args = functions[instrumented]
archive = Archive(instrumented)
for _ in tqdm(range(FUZZER_REPS), desc=f"fuzzer [{f_name}]"):
test = generate_test_case(instrumented, args, archive, bias_unseen=False)
alteration_choice = randrange(3)
if alteration_choice == 1:
test = mutate(test, args)
elif alteration_choice == 2:
test2 = generate_test_case(instrumented, args, archive, bias_unseen=False)
test, test2 = crossover(test, test2, args)
archive.consider_test(test2)
add_to_pool(args, test2)
archive.consider_test(test)
add_to_pool(args, test)
return archive.build_suite()
def main():
parser = argparse.ArgumentParser(prog='fuzzer.py',
description='Runs fuzzer for test case generation. Works on benchmark '
'files situated in the \'benchmark\' directory.')
parser.add_argument('file', type=str, help="File to test",
nargs="*")
parser.add_argument('-s', '--seed', type=int, help="Random generator seed",
nargs="?", default=0)
args = parser.parse_args()
generate_tests(args.file, args.seed, fuzzer_generate)
if __name__ == "__main__":
main()