From af6d21dbb1d692766c129db21e11446ce9634851 Mon Sep 17 00:00:00 2001 From: Claudio Maggioni Date: Mon, 13 Nov 2023 16:33:20 +0100 Subject: [PATCH] Instrumentor works --- instrumented/anagram_check.py | 12 ++ instrumented/caesar_cipher.py | 22 +++ instrumented/check_armstrong.py | 19 +++ instrumented/common_divisor_count.py | 21 +++ instrumented/exponentiation.py | 18 ++ instrumented/gcd.py | 16 ++ instrumented/longest_substring.py | 17 ++ instrumented/rabin_karp.py | 33 ++++ instrumented/railfence_cipher.py | 67 ++++++++ instrumented/zellers_birthday.py | 34 ++++ instrumentor.py | 48 +++--- requirements.txt | 4 +- sb_cgi_decode.py | 246 +++++++++++++++++++++++++++ 13 files changed, 530 insertions(+), 27 deletions(-) create mode 100644 instrumented/anagram_check.py create mode 100644 instrumented/caesar_cipher.py create mode 100644 instrumented/check_armstrong.py create mode 100644 instrumented/common_divisor_count.py create mode 100644 instrumented/exponentiation.py create mode 100644 instrumented/gcd.py create mode 100644 instrumented/longest_substring.py create mode 100644 instrumented/rabin_karp.py create mode 100644 instrumented/railfence_cipher.py create mode 100644 instrumented/zellers_birthday.py create mode 100644 sb_cgi_decode.py diff --git a/instrumented/anagram_check.py b/instrumented/anagram_check.py new file mode 100644 index 0000000..ed5fd3f --- /dev/null +++ b/instrumented/anagram_check.py @@ -0,0 +1,12 @@ + + +def anagram_check(s1: str, s2: str) -> bool: + if (evaluate_condition(1, 'Eq', len(s1), 1) and evaluate_condition(2, 'Eq', len(s2), 1)): + return (s1 == s2) + if evaluate_condition(3, 'NotEq', len(s1), len(s2)): + return False + if evaluate_condition(4, 'Eq', ''.join(sorted(s1)), ''.join(sorted(s2))): + return True + else: + return False + diff --git a/instrumented/caesar_cipher.py b/instrumented/caesar_cipher.py new file mode 100644 index 0000000..e29eb11 --- /dev/null +++ b/instrumented/caesar_cipher.py @@ -0,0 +1,22 @@ + + +def encrypt(strng: str, key: int) -> str: + assert (0 < key <= 94) + encrypted = '' + for x in strng: + indx = ((ord(x) + key) % 256) + if evaluate_condition(1, 'Gt', indx, 126): + indx = (indx - 95) + encrypted = (encrypted + chr(indx)) + return encrypted + +def decrypt(strng: str, key: int) -> str: + assert (0 < key <= 94) + decrypted = '' + for x in strng: + indx = ((ord(x) - key) % 256) + if evaluate_condition(2, 'Lt', indx, 32): + indx = (indx + 95) + decrypted = (decrypted + chr(indx)) + return decrypted + diff --git a/instrumented/check_armstrong.py b/instrumented/check_armstrong.py new file mode 100644 index 0000000..2dabeba --- /dev/null +++ b/instrumented/check_armstrong.py @@ -0,0 +1,19 @@ + + +def check_armstrong(n: int) -> bool: + assert (n >= 0) + if (evaluate_condition(1, 'Eq', n, 0) or evaluate_condition(2, 'Eq', n, 1)): + return True + if evaluate_condition(3, 'LtE', n, 150): + return False + t = n + sum = 0 + while evaluate_condition(4, 'NotEq', t, 0): + r = (t % 10) + sum = (sum + ((r * r) * r)) + t = (t // 10) + if evaluate_condition(5, 'Eq', sum, n): + return True + else: + return False + diff --git a/instrumented/common_divisor_count.py b/instrumented/common_divisor_count.py new file mode 100644 index 0000000..43213d6 --- /dev/null +++ b/instrumented/common_divisor_count.py @@ -0,0 +1,21 @@ + +'\nThe function takes two integers as input and return the number of common divisors of\nthat pair\n' + +def cd_count(a: int, b: int) -> int: + if (evaluate_condition(1, 'Eq', a, 0) or evaluate_condition(2, 'Eq', b, 0)): + return 2 + a = (((- 1) * a) if evaluate_condition(3, 'Lt', a, 0) else a) + b = (((- 1) * b) if evaluate_condition(4, 'Lt', b, 0) else b) + result = 0 + while evaluate_condition(5, 'NotEq', a, 0): + c = a + a = (b % a) + b = c + for i in range(1, int(((b ** 0.5) + 1))): + if evaluate_condition(6, 'Eq', (b % i), 0): + if evaluate_condition(7, 'Eq', int((b / i)), i): + result = (result + 1) + else: + result = (result + 2) + return result + diff --git a/instrumented/exponentiation.py b/instrumented/exponentiation.py new file mode 100644 index 0000000..72d310a --- /dev/null +++ b/instrumented/exponentiation.py @@ -0,0 +1,18 @@ + + +def exponentiation(baseNumber: int, power: int) -> float: + assert (not ((baseNumber == 0) or (power <= 0))) + answer = None + if evaluate_condition(1, 'Gt', power, 1): + halfAnswer = exponentiation_instrumented(baseNumber, (power // 2)) + answer = (halfAnswer * halfAnswer) + if evaluate_condition(2, 'Eq', (power % 2), 1): + answer *= baseNumber + elif evaluate_condition(3, 'Eq', power, 1): + answer = baseNumber + elif evaluate_condition(4, 'Eq', power, 0): + answer = 1 + else: + answer = (1 / exponentiation_instrumented(baseNumber, abs(power))) + return answer + diff --git a/instrumented/gcd.py b/instrumented/gcd.py new file mode 100644 index 0000000..955435d --- /dev/null +++ b/instrumented/gcd.py @@ -0,0 +1,16 @@ + + +def gcd(a: int, b: int) -> int: + assert ((a > 0) and (b > 0)) + if (evaluate_condition(1, 'Eq', a, 1) or evaluate_condition(2, 'Eq', b, 1)): + return 1 + if evaluate_condition(3, 'Eq', a, b): + return a + if evaluate_condition(4, 'Gt', b, a): + (a, b) = (b, a) + while evaluate_condition(5, 'NotEq', b, 0): + temp = b + b = (a % b) + a = temp + return a + diff --git a/instrumented/longest_substring.py b/instrumented/longest_substring.py new file mode 100644 index 0000000..66587b9 --- /dev/null +++ b/instrumented/longest_substring.py @@ -0,0 +1,17 @@ + + +def longest_sorted_substr(s: str) -> str: + count = 0 + max_count = 0 + end_position = 0 + for char in range((len(s) - 1)): + if evaluate_condition(1, 'LtE', s[char], s[(char + 1)]): + count += 1 + if evaluate_condition(2, 'Gt', count, max_count): + max_count = count + end_position = (char + 1) + else: + count = 0 + start_position = (end_position - max_count) + return s[start_position:(end_position + 1)] + diff --git a/instrumented/rabin_karp.py b/instrumented/rabin_karp.py new file mode 100644 index 0000000..0903173 --- /dev/null +++ b/instrumented/rabin_karp.py @@ -0,0 +1,33 @@ + + +def rabin_karp_search(pat: str, txt: str) -> list: + assert (len(pat) <= len(txt)) + d = 2560 + q = 101 + M = len(pat) + N = len(txt) + i = 0 + j = 0 + p = 0 + t = 0 + h = 1 + for i in range((M - 1)): + h = ((h * d) % q) + for i in range(M): + p = (((d * p) + ord(pat[i])) % q) + t = (((d * t) + ord(txt[i])) % q) + found_at_index = [] + for i in range(((N - M) + 1)): + if evaluate_condition(1, 'Eq', p, t): + for j in range(M): + if evaluate_condition(2, 'NotEq', txt[(i + j)], pat[j]): + break + j += 1 + if evaluate_condition(3, 'Eq', j, M): + found_at_index.append(i) + if evaluate_condition(4, 'Lt', i, (N - M)): + t = (((d * (t - (ord(txt[i]) * h))) + ord(txt[(i + M)])) % q) + if evaluate_condition(5, 'Lt', t, 0): + t = (t + q) + return found_at_index + diff --git a/instrumented/railfence_cipher.py b/instrumented/railfence_cipher.py new file mode 100644 index 0000000..ceee782 --- /dev/null +++ b/instrumented/railfence_cipher.py @@ -0,0 +1,67 @@ + + +def railencrypt(st: str, k: int) -> str: + assert (k > 1) + c = 0 + x = 0 + m = [([0] * len(st)) for i in range(k)] + for r in range(len(st)): + m[c][r] = ord(st[r]) + if evaluate_condition(1, 'Eq', x, 0): + if evaluate_condition(2, 'Eq', c, (k - 1)): + x = 1 + c -= 1 + else: + c += 1 + elif evaluate_condition(3, 'Eq', c, 0): + x = 0 + c += 1 + else: + c -= 1 + result = [] + for i in range(k): + for j in range(len(st)): + if evaluate_condition(4, 'NotEq', m[i][j], 0): + result.append(chr(m[i][j])) + return ''.join(result) + +def raildecrypt(st: str, k: int) -> str: + assert (k > 1) + (c, x) = (0, 0) + m = [([0] * len(st)) for i in range(k)] + for r in range(len(st)): + m[c][r] = 1 + if evaluate_condition(5, 'Eq', x, 0): + if evaluate_condition(6, 'Eq', c, (k - 1)): + x = 1 + c -= 1 + else: + c += 1 + elif evaluate_condition(7, 'Eq', c, 0): + x = 0 + c += 1 + else: + c -= 1 + result = [] + (c, x) = (0, 0) + for i in range(k): + for j in range(len(st)): + if evaluate_condition(8, 'Eq', m[i][j], 1): + m[i][j] = ord(st[x]) + x += 1 + for r in range(len(st)): + if evaluate_condition(9, 'NotEq', m[c][r], 0): + result.append(chr(m[c][r])) + if evaluate_condition(10, 'Eq', x, 0): + if evaluate_condition(11, 'Eq', c, (k - 1)): + x = 1 + c -= 1 + else: + c += 1 + elif evaluate_condition(12, 'Eq', c, 0): + x = 0 + c += 1 + else: + c -= 1 + return ''.join(result) + diff --git a/instrumented/zellers_birthday.py b/instrumented/zellers_birthday.py new file mode 100644 index 0000000..5b71b41 --- /dev/null +++ b/instrumented/zellers_birthday.py @@ -0,0 +1,34 @@ + + +def zeller(d: int, m: int, y: int) -> str: + assert (abs(d) >= 1) + assert (abs(m) >= 1) + assert ((0 <= abs(y) <= 99) or (1000 <= abs(y) <= 3000)) + d = abs(d) + m = abs(m) + y = abs(y) + if evaluate_condition(1, 'Gt', d, 31): + d = ((d % 31) + 1) + if evaluate_condition(2, 'Gt', m, 12): + m = ((m % 12) + 1) + if (evaluate_condition(3, 'Lt', y, 100) and evaluate_condition(4, 'Lt', y, 23)): + y = (2000 + y) + if (evaluate_condition(5, 'Lt', y, 100) and evaluate_condition(6, 'GtE', y, 23)): + y = (1900 + y) + days = {'0': 'Sunday', '1': 'Monday', '2': 'Tuesday', '3': 'Wednesday', '4': 'Thursday', '5': 'Friday', '6': 'Saturday'} + if evaluate_condition(7, 'LtE', m, 2): + y = (y - 1) + m = (m + 12) + c = int(str(y)[:2]) + k = int(str(y)[2:]) + t = int(((2.6 * m) - 5.39)) + u = int((c / 4)) + v = int((k / 4)) + x = (d + k) + z = (((t + u) + v) + x) + w = (z - (2 * c)) + f = round((w % 7)) + for i in days: + if evaluate_condition(8, 'Eq', f, int(i)): + return days[i] + diff --git a/instrumentor.py b/instrumentor.py index 7756334..d8f2e60 100644 --- a/instrumentor.py +++ b/instrumentor.py @@ -1,6 +1,5 @@ -from collections import defaultdict from dataclasses import dataclass -from typing import TypeVar, Callable, Optional +from typing import TypeVar, Callable from typing import Generic from nltk import edit_distance @@ -26,24 +25,25 @@ class CmpOp(Generic[T]): self.false_dist = false_dist -@dataclass -class InstrState: - min_true_dist: Optional[int] - min_false_dist: Optional[int] +# @dataclass +# class InstrState: +# min_true_dist: Optional[int] +# min_false_dist: Optional[int] +# +# def __init__(self): +# self.min_true_dist = None +# self.min_false_dist = None +# +# def update(self, op: CmpOp[U], lhs: U, rhs: U): +# true_dist = op.true_dist(lhs, rhs) +# self.min_true_dist = true_dist if self.min_true_dist is None else min(true_dist, self.min_true_dist) +# +# false_dist = op.false_dist(lhs, rhs) +# self.min_false_dist = false_dist if self.min_false_dist is None else min(false_dist, self.min_false_dist) +# +# +# instrumentation_states: defaultdict[int, InstrState] = defaultdict(InstrState) - def __init__(self): - self.min_true_dist = None - self.min_false_dist = None - - def update(self, op: CmpOp[U], lhs: U, rhs: U): - true_dist = op.true_dist(lhs, rhs) - self.min_true_dist = true_dist if self.min_true_dist is None else min(true_dist, self.min_true_dist) - - false_dist = op.false_dist(lhs, rhs) - self.min_false_dist = false_dist if self.min_false_dist is None else min(false_dist, self.min_false_dist) - - -instrumentation_states: defaultdict[int, InstrState] = defaultdict(InstrState) # Operands for these must both be integers or strings of length 1 int_str_ops: list[CmpOp[int | str]] = [ @@ -120,7 +120,7 @@ def str_check(a: any, b: any) -> bool: return type(a) == str and type(b) == str -def evaluate_condition(cmp_id: int, name: str, lhs: any, rhs: any) -> bool: +def compute_distances(name: str, lhs: any, rhs: any) -> tuple[int, int]: if int_str_check(lhs, rhs): lhs_int = int_str_convert(lhs) rhs_int = int_str_convert(rhs) @@ -129,17 +129,13 @@ def evaluate_condition(cmp_id: int, name: str, lhs: any, rhs: any) -> bool: raise ValueError(f"'{name}' is not a valid CmpOp name for 'int_str' operators") op = int_str_by_name[name] - - instrumentation_states[cmp_id].update(op, lhs_int, rhs_int) - return op.test(lhs_int, rhs_int) + return op.true_dist(lhs_int, rhs_int), op.false_dist(lhs_int, rhs_int) if str_check(lhs, rhs): if name not in str_by_name: raise ValueError(f"'{name}' is not a valid CmpOp name for 'str' operators") op = int_str_by_name[name] - - instrumentation_states[cmp_id].update(op, lhs, rhs) - return op.test(lhs, rhs) + return op.true_dist(lhs, rhs), op.false_dist(lhs, rhs) raise ValueError(f"'{lhs}' and '{rhs}' are not suitable for both 'int_str' and 'str' operators") diff --git a/requirements.txt b/requirements.txt index 1a170a4..51a6bf1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -nltk==3.8.1 \ No newline at end of file +nltk==3.8.1 +deap==1.4.1 +astunparse==1.6.3 \ No newline at end of file diff --git a/sb_cgi_decode.py b/sb_cgi_decode.py new file mode 100644 index 0000000..1111944 --- /dev/null +++ b/sb_cgi_decode.py @@ -0,0 +1,246 @@ +from typing import Optional +import os.path + +import ast +import astunparse +import sys +import random +from deap import creator, base, tools, algorithms +from instrumentor import compute_distances + +# hyperparameters +NPOP = 300 +NGEN = 200 +INDMUPROB = 0.05 +MUPROB = 0.1 +CXPROB = 0.5 +TOURNSIZE = 3 +LOW = -1000 +UP = 1000 +REPS = 10 +MAX_STRING_LENGTH = 10 + +IN_DIR: str = os.path.join(os.path.dirname(__file__), 'benchmark') +OUT_DIR: str = os.path.join(os.path.dirname(__file__), 'instrumented') + +distances_true: dict[int, int] = {} +distances_false: dict[int, int] = {} +branches: list[int] = [1, 2, 3, 4, 5] +archive_true_branches: dict[int, str] = {} +archive_false_branches: dict[int, str] = {} + + +def cgi_decode_instrumented(s: str) -> str: + return "" # make mypy happy + + +class BranchTransformer(ast.NodeTransformer): + branch_num: int + instrumented_name: Optional[str] + + def __init__(self): + self.branch_num = 0 + self.instrumented_name = None + + @staticmethod + def to_instrumented_name(name: str): + return name + "_instrumented" + + def visit_Assert(self, ast_node): + # Disable recursion in asserts, i.e. do not instrument assert conditions + return ast_node + + def visit_Return(self, ast_node): + # Same thing for return statements + return ast_node + + def visit_FunctionDef(self, ast_node): + self.instrumented_name = ast_node.name + inner_node = self.generic_visit(ast_node) + self.instrumented_name = None + return inner_node + + def visit_Call(self, ast_node): + if isinstance(ast_node.func, ast.Name) and ast_node.func.id == self.instrumented_name: + ast_node.func.id = BranchTransformer.to_instrumented_name(ast_node.func.id) + return ast_node + + def visit_Compare(self, ast_node): + if ast_node.ops[0] in [ast.Is, ast.IsNot, ast.In, ast.NotIn]: + return ast_node + + self.branch_num += 1 + return ast.Call(func=ast.Name("evaluate_condition", ast.Load()), + args=[ast.Num(self.branch_num), + ast.Str(ast_node.ops[0].__class__.__name__), + ast_node.left, + ast_node.comparators[0]], + keywords=[], + starargs=None, + kwargs=None) + + +def update_maps(condition_num, d_true, d_false): + global distances_true, distances_false + + if condition_num in distances_true.keys(): + distances_true[condition_num] = min(distances_true[condition_num], d_true) + else: + distances_true[condition_num] = d_true + + if condition_num in distances_false.keys(): + distances_false[condition_num] = min(distances_false[condition_num], d_false) + else: + distances_false[condition_num] = d_false + + +def evaluate_condition(num, op, lhs, rhs): # type: ignore + if op == "In": + if isinstance(lhs, str): + lhs = ord(lhs) + + minimum = sys.maxsize + for elem in rhs.keys(): + distance = abs(lhs - ord(elem)) + if distance < minimum: + minimum = distance + + distance_true, distance_false = minimum, 1 if minimum == 0 else 0 + else: + distance_true, distance_false = compute_distances(op, lhs, rhs) + + update_maps(num, distance_true, distance_false) + + # distance == 0 equivalent to actual test by construction + return distance_true == 0 + + +def normalize(x): + return x / (1.0 + x) + + +def get_fitness_cgi(individual): + x = individual[0] + # Reset any distance values from previous executions + global distances_true, distances_false + global branches, archive_true_branches, archive_false_branches + distances_true = {} + distances_false = {} + + # Run the function under test + try: + cgi_decode_instrumented(x) + except BaseException: + pass + + # Sum up branch distances + fitness = 0.0 + for branch in branches: + if branch in distances_true: + if distances_true[branch] == 0 and branch not in archive_true_branches: + archive_true_branches[branch] = x + if branch not in archive_true_branches: + fitness += normalize(distances_true[branch]) + for branch in branches: + if branch in distances_false: + if distances_false[branch] == 0 and branch not in archive_false_branches: + archive_false_branches[branch] = x + if branch not in archive_false_branches: + fitness += normalize(distances_false[branch]) + + return fitness, + + +def random_string(): + l = random.randint(0, MAX_STRING_LENGTH) + s = "" + for i in range(l): + random_character = chr(random.randrange(32, 127)) + s = s + random_character + return s + + +def crossover(individual1, individual2): + parent1 = individual1[0] + parent2 = individual2[0] + if len(parent1) > 1 and len(parent2) > 1: + pos = random.randint(1, len(parent1)) + offspring1 = parent1[:pos] + parent2[pos:] + offspring2 = parent2[:pos] + parent1[pos:] + individual1[0] = offspring1 + individual2[0] = offspring2 + + return individual1, individual2 + + +def mutate(individual): + chromosome = individual[0] + mutated = chromosome[:] + if len(mutated) > 0: + prob = 1.0 / len(mutated) + for pos in range(len(mutated)): + if random.random() < prob: + new_c = chr(random.randrange(32, 127)) + mutated = mutated[:pos] + new_c + mutated[pos + 1:] + individual[0] = mutated + return individual, + + +def generate(): + global archive_true_branches, archive_false_branches + + creator.create("Fitness", base.Fitness, weights=(-1.0,)) + creator.create("Individual", list, fitness=creator.Fitness) + + toolbox = base.Toolbox() + toolbox.register("attr_str", random_string) + toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_str, n=1) + toolbox.register("population", tools.initRepeat, list, toolbox.individual) + toolbox.register("evaluate", get_fitness_cgi) + toolbox.register("mate", crossover) + toolbox.register("mutate", mutate) + toolbox.register("select", tools.selTournament, tournsize=TOURNSIZE) + + coverage = [] + for i in range(REPS): + archive_true_branches = {} + archive_false_branches = {} + population = toolbox.population(n=NPOP) + algorithms.eaSimple(population, toolbox, CXPROB, MUPROB, NGEN, verbose=False) + cov = len(archive_true_branches) + len(archive_false_branches) + print(cov, archive_true_branches, archive_false_branches) + coverage.append(cov) + + +def instrument(source_path: str, target_path: str): + with open(source_path, "r") as f: + source = f.read() + + node = ast.parse(source) + print(ast.dump(node, indent=2)) + BranchTransformer().visit(node) + node = ast.fix_missing_locations(node) # Make sure the line numbers are ok before printing + + with open(target_path, "w") as f: + print(astunparse.unparse(node), file=f) + + current_module = sys.modules[__name__] + code = compile(node, filename="", mode="exec") + exec(code, current_module.__dict__) # try: cgi_decode_instrumented("a%20%32"), print distances_true + + +def find_py_files(search_dir: str): + for (cwd, dirs, files) in os.walk(search_dir): + for file in files: + if file.endswith(".py"): + yield os.path.join(cwd, file) + + +def main(): + for file in find_py_files(IN_DIR): + instrument(file, os.path.join(OUT_DIR, os.path.basename(file))) + # generate() + + +if __name__ == '__main__': + main()