Instrumentor works

This commit is contained in:
Claudio Maggioni 2023-11-13 16:33:20 +01:00
parent fb409cb714
commit af6d21dbb1
13 changed files with 530 additions and 27 deletions

View file

@ -0,0 +1,12 @@
def anagram_check(s1: str, s2: str) -> bool:
if (evaluate_condition(1, 'Eq', len(s1), 1) and evaluate_condition(2, 'Eq', len(s2), 1)):
return (s1 == s2)
if evaluate_condition(3, 'NotEq', len(s1), len(s2)):
return False
if evaluate_condition(4, 'Eq', ''.join(sorted(s1)), ''.join(sorted(s2))):
return True
else:
return False

View file

@ -0,0 +1,22 @@
def encrypt(strng: str, key: int) -> str:
assert (0 < key <= 94)
encrypted = ''
for x in strng:
indx = ((ord(x) + key) % 256)
if evaluate_condition(1, 'Gt', indx, 126):
indx = (indx - 95)
encrypted = (encrypted + chr(indx))
return encrypted
def decrypt(strng: str, key: int) -> str:
assert (0 < key <= 94)
decrypted = ''
for x in strng:
indx = ((ord(x) - key) % 256)
if evaluate_condition(2, 'Lt', indx, 32):
indx = (indx + 95)
decrypted = (decrypted + chr(indx))
return decrypted

View file

@ -0,0 +1,19 @@
def check_armstrong(n: int) -> bool:
assert (n >= 0)
if (evaluate_condition(1, 'Eq', n, 0) or evaluate_condition(2, 'Eq', n, 1)):
return True
if evaluate_condition(3, 'LtE', n, 150):
return False
t = n
sum = 0
while evaluate_condition(4, 'NotEq', t, 0):
r = (t % 10)
sum = (sum + ((r * r) * r))
t = (t // 10)
if evaluate_condition(5, 'Eq', sum, n):
return True
else:
return False

View file

@ -0,0 +1,21 @@
'\nThe function takes two integers as input and return the number of common divisors of\nthat pair\n'
def cd_count(a: int, b: int) -> int:
if (evaluate_condition(1, 'Eq', a, 0) or evaluate_condition(2, 'Eq', b, 0)):
return 2
a = (((- 1) * a) if evaluate_condition(3, 'Lt', a, 0) else a)
b = (((- 1) * b) if evaluate_condition(4, 'Lt', b, 0) else b)
result = 0
while evaluate_condition(5, 'NotEq', a, 0):
c = a
a = (b % a)
b = c
for i in range(1, int(((b ** 0.5) + 1))):
if evaluate_condition(6, 'Eq', (b % i), 0):
if evaluate_condition(7, 'Eq', int((b / i)), i):
result = (result + 1)
else:
result = (result + 2)
return result

View file

@ -0,0 +1,18 @@
def exponentiation(baseNumber: int, power: int) -> float:
assert (not ((baseNumber == 0) or (power <= 0)))
answer = None
if evaluate_condition(1, 'Gt', power, 1):
halfAnswer = exponentiation_instrumented(baseNumber, (power // 2))
answer = (halfAnswer * halfAnswer)
if evaluate_condition(2, 'Eq', (power % 2), 1):
answer *= baseNumber
elif evaluate_condition(3, 'Eq', power, 1):
answer = baseNumber
elif evaluate_condition(4, 'Eq', power, 0):
answer = 1
else:
answer = (1 / exponentiation_instrumented(baseNumber, abs(power)))
return answer

16
instrumented/gcd.py Normal file
View file

@ -0,0 +1,16 @@
def gcd(a: int, b: int) -> int:
assert ((a > 0) and (b > 0))
if (evaluate_condition(1, 'Eq', a, 1) or evaluate_condition(2, 'Eq', b, 1)):
return 1
if evaluate_condition(3, 'Eq', a, b):
return a
if evaluate_condition(4, 'Gt', b, a):
(a, b) = (b, a)
while evaluate_condition(5, 'NotEq', b, 0):
temp = b
b = (a % b)
a = temp
return a

View file

@ -0,0 +1,17 @@
def longest_sorted_substr(s: str) -> str:
count = 0
max_count = 0
end_position = 0
for char in range((len(s) - 1)):
if evaluate_condition(1, 'LtE', s[char], s[(char + 1)]):
count += 1
if evaluate_condition(2, 'Gt', count, max_count):
max_count = count
end_position = (char + 1)
else:
count = 0
start_position = (end_position - max_count)
return s[start_position:(end_position + 1)]

View file

@ -0,0 +1,33 @@
def rabin_karp_search(pat: str, txt: str) -> list:
assert (len(pat) <= len(txt))
d = 2560
q = 101
M = len(pat)
N = len(txt)
i = 0
j = 0
p = 0
t = 0
h = 1
for i in range((M - 1)):
h = ((h * d) % q)
for i in range(M):
p = (((d * p) + ord(pat[i])) % q)
t = (((d * t) + ord(txt[i])) % q)
found_at_index = []
for i in range(((N - M) + 1)):
if evaluate_condition(1, 'Eq', p, t):
for j in range(M):
if evaluate_condition(2, 'NotEq', txt[(i + j)], pat[j]):
break
j += 1
if evaluate_condition(3, 'Eq', j, M):
found_at_index.append(i)
if evaluate_condition(4, 'Lt', i, (N - M)):
t = (((d * (t - (ord(txt[i]) * h))) + ord(txt[(i + M)])) % q)
if evaluate_condition(5, 'Lt', t, 0):
t = (t + q)
return found_at_index

View file

@ -0,0 +1,67 @@
def railencrypt(st: str, k: int) -> str:
assert (k > 1)
c = 0
x = 0
m = [([0] * len(st)) for i in range(k)]
for r in range(len(st)):
m[c][r] = ord(st[r])
if evaluate_condition(1, 'Eq', x, 0):
if evaluate_condition(2, 'Eq', c, (k - 1)):
x = 1
c -= 1
else:
c += 1
elif evaluate_condition(3, 'Eq', c, 0):
x = 0
c += 1
else:
c -= 1
result = []
for i in range(k):
for j in range(len(st)):
if evaluate_condition(4, 'NotEq', m[i][j], 0):
result.append(chr(m[i][j]))
return ''.join(result)
def raildecrypt(st: str, k: int) -> str:
assert (k > 1)
(c, x) = (0, 0)
m = [([0] * len(st)) for i in range(k)]
for r in range(len(st)):
m[c][r] = 1
if evaluate_condition(5, 'Eq', x, 0):
if evaluate_condition(6, 'Eq', c, (k - 1)):
x = 1
c -= 1
else:
c += 1
elif evaluate_condition(7, 'Eq', c, 0):
x = 0
c += 1
else:
c -= 1
result = []
(c, x) = (0, 0)
for i in range(k):
for j in range(len(st)):
if evaluate_condition(8, 'Eq', m[i][j], 1):
m[i][j] = ord(st[x])
x += 1
for r in range(len(st)):
if evaluate_condition(9, 'NotEq', m[c][r], 0):
result.append(chr(m[c][r]))
if evaluate_condition(10, 'Eq', x, 0):
if evaluate_condition(11, 'Eq', c, (k - 1)):
x = 1
c -= 1
else:
c += 1
elif evaluate_condition(12, 'Eq', c, 0):
x = 0
c += 1
else:
c -= 1
return ''.join(result)

View file

@ -0,0 +1,34 @@
def zeller(d: int, m: int, y: int) -> str:
assert (abs(d) >= 1)
assert (abs(m) >= 1)
assert ((0 <= abs(y) <= 99) or (1000 <= abs(y) <= 3000))
d = abs(d)
m = abs(m)
y = abs(y)
if evaluate_condition(1, 'Gt', d, 31):
d = ((d % 31) + 1)
if evaluate_condition(2, 'Gt', m, 12):
m = ((m % 12) + 1)
if (evaluate_condition(3, 'Lt', y, 100) and evaluate_condition(4, 'Lt', y, 23)):
y = (2000 + y)
if (evaluate_condition(5, 'Lt', y, 100) and evaluate_condition(6, 'GtE', y, 23)):
y = (1900 + y)
days = {'0': 'Sunday', '1': 'Monday', '2': 'Tuesday', '3': 'Wednesday', '4': 'Thursday', '5': 'Friday', '6': 'Saturday'}
if evaluate_condition(7, 'LtE', m, 2):
y = (y - 1)
m = (m + 12)
c = int(str(y)[:2])
k = int(str(y)[2:])
t = int(((2.6 * m) - 5.39))
u = int((c / 4))
v = int((k / 4))
x = (d + k)
z = (((t + u) + v) + x)
w = (z - (2 * c))
f = round((w % 7))
for i in days:
if evaluate_condition(8, 'Eq', f, int(i)):
return days[i]

View file

@ -1,6 +1,5 @@
from collections import defaultdict
from dataclasses import dataclass
from typing import TypeVar, Callable, Optional
from typing import TypeVar, Callable
from typing import Generic
from nltk import edit_distance
@ -26,24 +25,25 @@ class CmpOp(Generic[T]):
self.false_dist = false_dist
@dataclass
class InstrState:
min_true_dist: Optional[int]
min_false_dist: Optional[int]
# @dataclass
# class InstrState:
# min_true_dist: Optional[int]
# min_false_dist: Optional[int]
#
# def __init__(self):
# self.min_true_dist = None
# self.min_false_dist = None
#
# def update(self, op: CmpOp[U], lhs: U, rhs: U):
# true_dist = op.true_dist(lhs, rhs)
# self.min_true_dist = true_dist if self.min_true_dist is None else min(true_dist, self.min_true_dist)
#
# false_dist = op.false_dist(lhs, rhs)
# self.min_false_dist = false_dist if self.min_false_dist is None else min(false_dist, self.min_false_dist)
#
#
# instrumentation_states: defaultdict[int, InstrState] = defaultdict(InstrState)
def __init__(self):
self.min_true_dist = None
self.min_false_dist = None
def update(self, op: CmpOp[U], lhs: U, rhs: U):
true_dist = op.true_dist(lhs, rhs)
self.min_true_dist = true_dist if self.min_true_dist is None else min(true_dist, self.min_true_dist)
false_dist = op.false_dist(lhs, rhs)
self.min_false_dist = false_dist if self.min_false_dist is None else min(false_dist, self.min_false_dist)
instrumentation_states: defaultdict[int, InstrState] = defaultdict(InstrState)
# Operands for these must both be integers or strings of length 1
int_str_ops: list[CmpOp[int | str]] = [
@ -120,7 +120,7 @@ def str_check(a: any, b: any) -> bool:
return type(a) == str and type(b) == str
def evaluate_condition(cmp_id: int, name: str, lhs: any, rhs: any) -> bool:
def compute_distances(name: str, lhs: any, rhs: any) -> tuple[int, int]:
if int_str_check(lhs, rhs):
lhs_int = int_str_convert(lhs)
rhs_int = int_str_convert(rhs)
@ -129,17 +129,13 @@ def evaluate_condition(cmp_id: int, name: str, lhs: any, rhs: any) -> bool:
raise ValueError(f"'{name}' is not a valid CmpOp name for 'int_str' operators")
op = int_str_by_name[name]
instrumentation_states[cmp_id].update(op, lhs_int, rhs_int)
return op.test(lhs_int, rhs_int)
return op.true_dist(lhs_int, rhs_int), op.false_dist(lhs_int, rhs_int)
if str_check(lhs, rhs):
if name not in str_by_name:
raise ValueError(f"'{name}' is not a valid CmpOp name for 'str' operators")
op = int_str_by_name[name]
instrumentation_states[cmp_id].update(op, lhs, rhs)
return op.test(lhs, rhs)
return op.true_dist(lhs, rhs), op.false_dist(lhs, rhs)
raise ValueError(f"'{lhs}' and '{rhs}' are not suitable for both 'int_str' and 'str' operators")

View file

@ -1 +1,3 @@
nltk==3.8.1
deap==1.4.1
astunparse==1.6.3

246
sb_cgi_decode.py Normal file
View file

@ -0,0 +1,246 @@
from typing import Optional
import os.path
import ast
import astunparse
import sys
import random
from deap import creator, base, tools, algorithms
from instrumentor import compute_distances
# hyperparameters
NPOP = 300
NGEN = 200
INDMUPROB = 0.05
MUPROB = 0.1
CXPROB = 0.5
TOURNSIZE = 3
LOW = -1000
UP = 1000
REPS = 10
MAX_STRING_LENGTH = 10
IN_DIR: str = os.path.join(os.path.dirname(__file__), 'benchmark')
OUT_DIR: str = os.path.join(os.path.dirname(__file__), 'instrumented')
distances_true: dict[int, int] = {}
distances_false: dict[int, int] = {}
branches: list[int] = [1, 2, 3, 4, 5]
archive_true_branches: dict[int, str] = {}
archive_false_branches: dict[int, str] = {}
def cgi_decode_instrumented(s: str) -> str:
return "" # make mypy happy
class BranchTransformer(ast.NodeTransformer):
branch_num: int
instrumented_name: Optional[str]
def __init__(self):
self.branch_num = 0
self.instrumented_name = None
@staticmethod
def to_instrumented_name(name: str):
return name + "_instrumented"
def visit_Assert(self, ast_node):
# Disable recursion in asserts, i.e. do not instrument assert conditions
return ast_node
def visit_Return(self, ast_node):
# Same thing for return statements
return ast_node
def visit_FunctionDef(self, ast_node):
self.instrumented_name = ast_node.name
inner_node = self.generic_visit(ast_node)
self.instrumented_name = None
return inner_node
def visit_Call(self, ast_node):
if isinstance(ast_node.func, ast.Name) and ast_node.func.id == self.instrumented_name:
ast_node.func.id = BranchTransformer.to_instrumented_name(ast_node.func.id)
return ast_node
def visit_Compare(self, ast_node):
if ast_node.ops[0] in [ast.Is, ast.IsNot, ast.In, ast.NotIn]:
return ast_node
self.branch_num += 1
return ast.Call(func=ast.Name("evaluate_condition", ast.Load()),
args=[ast.Num(self.branch_num),
ast.Str(ast_node.ops[0].__class__.__name__),
ast_node.left,
ast_node.comparators[0]],
keywords=[],
starargs=None,
kwargs=None)
def update_maps(condition_num, d_true, d_false):
global distances_true, distances_false
if condition_num in distances_true.keys():
distances_true[condition_num] = min(distances_true[condition_num], d_true)
else:
distances_true[condition_num] = d_true
if condition_num in distances_false.keys():
distances_false[condition_num] = min(distances_false[condition_num], d_false)
else:
distances_false[condition_num] = d_false
def evaluate_condition(num, op, lhs, rhs): # type: ignore
if op == "In":
if isinstance(lhs, str):
lhs = ord(lhs)
minimum = sys.maxsize
for elem in rhs.keys():
distance = abs(lhs - ord(elem))
if distance < minimum:
minimum = distance
distance_true, distance_false = minimum, 1 if minimum == 0 else 0
else:
distance_true, distance_false = compute_distances(op, lhs, rhs)
update_maps(num, distance_true, distance_false)
# distance == 0 equivalent to actual test by construction
return distance_true == 0
def normalize(x):
return x / (1.0 + x)
def get_fitness_cgi(individual):
x = individual[0]
# Reset any distance values from previous executions
global distances_true, distances_false
global branches, archive_true_branches, archive_false_branches
distances_true = {}
distances_false = {}
# Run the function under test
try:
cgi_decode_instrumented(x)
except BaseException:
pass
# Sum up branch distances
fitness = 0.0
for branch in branches:
if branch in distances_true:
if distances_true[branch] == 0 and branch not in archive_true_branches:
archive_true_branches[branch] = x
if branch not in archive_true_branches:
fitness += normalize(distances_true[branch])
for branch in branches:
if branch in distances_false:
if distances_false[branch] == 0 and branch not in archive_false_branches:
archive_false_branches[branch] = x
if branch not in archive_false_branches:
fitness += normalize(distances_false[branch])
return fitness,
def random_string():
l = random.randint(0, MAX_STRING_LENGTH)
s = ""
for i in range(l):
random_character = chr(random.randrange(32, 127))
s = s + random_character
return s
def crossover(individual1, individual2):
parent1 = individual1[0]
parent2 = individual2[0]
if len(parent1) > 1 and len(parent2) > 1:
pos = random.randint(1, len(parent1))
offspring1 = parent1[:pos] + parent2[pos:]
offspring2 = parent2[:pos] + parent1[pos:]
individual1[0] = offspring1
individual2[0] = offspring2
return individual1, individual2
def mutate(individual):
chromosome = individual[0]
mutated = chromosome[:]
if len(mutated) > 0:
prob = 1.0 / len(mutated)
for pos in range(len(mutated)):
if random.random() < prob:
new_c = chr(random.randrange(32, 127))
mutated = mutated[:pos] + new_c + mutated[pos + 1:]
individual[0] = mutated
return individual,
def generate():
global archive_true_branches, archive_false_branches
creator.create("Fitness", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.Fitness)
toolbox = base.Toolbox()
toolbox.register("attr_str", random_string)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_str, n=1)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", get_fitness_cgi)
toolbox.register("mate", crossover)
toolbox.register("mutate", mutate)
toolbox.register("select", tools.selTournament, tournsize=TOURNSIZE)
coverage = []
for i in range(REPS):
archive_true_branches = {}
archive_false_branches = {}
population = toolbox.population(n=NPOP)
algorithms.eaSimple(population, toolbox, CXPROB, MUPROB, NGEN, verbose=False)
cov = len(archive_true_branches) + len(archive_false_branches)
print(cov, archive_true_branches, archive_false_branches)
coverage.append(cov)
def instrument(source_path: str, target_path: str):
with open(source_path, "r") as f:
source = f.read()
node = ast.parse(source)
print(ast.dump(node, indent=2))
BranchTransformer().visit(node)
node = ast.fix_missing_locations(node) # Make sure the line numbers are ok before printing
with open(target_path, "w") as f:
print(astunparse.unparse(node), file=f)
current_module = sys.modules[__name__]
code = compile(node, filename="<ast>", mode="exec")
exec(code, current_module.__dict__) # try: cgi_decode_instrumented("a%20%32"), print distances_true
def find_py_files(search_dir: str):
for (cwd, dirs, files) in os.walk(search_dir):
for file in files:
if file.endswith(".py"):
yield os.path.join(cwd, file)
def main():
for file in find_py_files(IN_DIR):
instrument(file, os.path.join(OUT_DIR, os.path.basename(file)))
# generate()
if __name__ == '__main__':
main()