diff --git a/figure_8/figure8-abcd-only.py b/figure_8/figure8-abcd-only.py index bc9e3cb8..a636c907 100755 --- a/figure_8/figure8-abcd-only.py +++ b/figure_8/figure8-abcd-only.py @@ -16,10 +16,12 @@ from pyspark.sql import Window from pyspark.sql.types import * from decimal import * -if len(sys.argv) is not 4: - print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") +if len(sys.argv) is not 5: + print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {joindir}") sys.exit() +joindir=sys.argv[4] + cluster=sys.argv[1] spark = pyspark.sql.SparkSession.builder \ @@ -30,25 +32,7 @@ spark = pyspark.sql.SparkSession.builder \ .getOrCreate() sc = spark.sparkContext -# READING INSTANCE EVENTS DATA -dfpath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz" -df = spark.read.json(dfpath) - -### TESTING ONLY! -df = df.limit(50000) - -try: - df["collection_type"] = df["collection_type"].cast(ByteType()) -except: - df = df.withColumn("collection_type", lit(None).cast(ByteType())) - -df = df.rdd \ - .filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and - x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and - x.resource_request.cpus is not None and x.resource_request.memory is not None) \ - .map(lambda x: [tabid(x), int(x.time), int(x.type), - Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \ - .toDF(["id", "time", "type", "rcpu", "rram", "mid"]) +df = spark.read.parquet(joindir + "/figure-8-join-" + cluster + ".parquet") # READING MACHINE EVENTS DATA, sort them and save them as broadcast variable print("Starting to read machine events...") @@ -96,11 +80,11 @@ def get_machine_time_resources(machine_id, time): return aux(0, len(dfm.value)-1) -def increment_reserv_bucket(bucket, ceils, reserv, last_term_by_id): - idx = 0 - while idx < len(ceils) and ceils[idx] < reserv: - idx += 1 - +def increment_reserv_bucket(bucket, value): + if value < 0: + idx = 0 + else: + idx = 40 if value >= 1 else (int(value * 40) + 1) bucket[idx] += 1 def for_each_joined(x): @@ -110,34 +94,31 @@ def for_each_joined(x): term = -1 ts = sorted(ts, key=lambda x: x.time) - request_ceils = [0.025, 0.05, 0.075] - cpu_request = [0] * 4 # [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075 - ram_request = [0] * 4 # [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075 - - reserv_ceils = [0, 0.2, 0.4, 0.6, 0.8, 1] - cpu_reservs = [0] * 7 # [n, a, b, c, d, e, f] where: - ram_reservs = [0] * 7 + cpu_util = [0] * 41 + ram_util = [0] * 41 + cpu_request = [0] * 41 + ram_request = [0] * 41 for i, t in enumerate(ts): machine_log = get_machine_time_resources(mid, t.time) if machine_log is not None: - reserv_cpu = tot_req[0] / machine_logs[0][2] - reserv_ram = tot_req[1] / machine_logs[0][3] + util_cpu = t.acpu / machine_log[2] + util_ram = t.aram / machine_log[3] else: - reserv_cpu = -1 - reserv_ram = -1 + util_cpu = -1 + util_ram = -1 # 8a-b - increment_reserv_bucket(cpu_request, request_ceils, t.id, t.rcpu) - increment_reserv_bucket(ram_request, request_ceils, t.id, t.rram) - # 8c-d - increment_reserv_bucket(cpu_reservs, reserv_ceils, t.id, reserv_cpu) - increment_reserv_bucket(ram_reservs, reserv_ceils, t.id, reserv_ram) + increment_reserv_bucket(cpu_request, t.rcpu) + increment_reserv_bucket(ram_request, t.rram) + # 8e-f + increment_reserv_bucket(cpu_util, t.acpu) + increment_reserv_bucket(ram_util, t.aram) if t.type >= 4 and t.type <= 8: term = t.type res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None} - res[term] = {'rcpu': cpu_request, 'rram': ram_request, 'rscpu': cpu_reservs, 'rsram': ram_reservs} + res[term] = {'rcpu': cpu_request, 'rram': ram_request, 'ucpu': cpu_util, 'uram': ram_util} return res