diff --git a/figure_8/figure8-abcd-only.py b/figure_8/figure8-abcd-only.py new file mode 100755 index 00000000..bc9e3cb8 --- /dev/null +++ b/figure_8/figure8-abcd-only.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +import os +import json +import pandas as pd +import findspark +findspark.init() +import pyspark +import pyspark.sql +import sys +import gzip +from pyspark import AccumulatorParam +from pyspark.sql.functions import lit +from pyspark.sql import Window +from pyspark.sql.types import * +from decimal import * + +if len(sys.argv) is not 4: + print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") + sys.exit() + +cluster=sys.argv[1] + +spark = pyspark.sql.SparkSession.builder \ + .appName("task_slowdown") \ + .config("spark.driver.maxResultSize", sys.argv[3]) \ + .config("spark.local.dir", sys.argv[2]) \ + .config("spark.driver.memory", sys.argv[3]) \ + .getOrCreate() +sc = spark.sparkContext + +# READING INSTANCE EVENTS DATA +dfpath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz" +df = spark.read.json(dfpath) + +### TESTING ONLY! +df = df.limit(50000) + +try: + df["collection_type"] = df["collection_type"].cast(ByteType()) +except: + df = df.withColumn("collection_type", lit(None).cast(ByteType())) + +df = df.rdd \ + .filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and + x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and + x.resource_request.cpus is not None and x.resource_request.memory is not None) \ + .map(lambda x: [tabid(x), int(x.time), int(x.type), + Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \ + .toDF(["id", "time", "type", "rcpu", "rram", "mid"]) + +# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable +print("Starting to read machine events...") +dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={ + 'time': lambda x: -1 if x == '' else int(x), + 'machine_id': lambda x: str(x), + 'capacity.cpus': lambda x: -1 if x == '' else Decimal(x), + 'capacity.memory': lambda x: -1 if x == '' else Decimal(x)}) +print("Dropping remove events...") +dfm = dfm[(dfm.type!=2)&(dfm.time!=-1)&(dfm["capacity.cpus"]!=-1)&(dfm["capacity.memory"]!=-1)] +print("Dropping missing data events...") +dfm = dfm[dfm.missing_data_reason.isnull()] +print("Projecting on useful columns...") +dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]] +print("Sorting by time...") +dfm = dfm.sort_values(by=["machine_id", "time"]) +print("Converting to broadcast variable...") +dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()]) +print("Done with machine events.") + +def get_machine_time_resources(machine_id, time): + def aux(i, j): + if i == j: + return dfm.value[i] if dfm.value[i][1] == machine_id else None + elif i + 1 == j: + if dfm.value[i][1] == machine_id: + return dfm.value[i] + elif dfm.value[j][1] == machine_id: + return dfm.value[j] + else: + return None + + mid = (i + j) // 2 + + if dfm.value[mid][1] > machine_id: + return aux(i, mid - 1) + elif dfm.value[mid][1] < machine_id: + return aux(mid + 1, j) + elif dfm.value[mid][0] > time: + return aux(i, mid) + elif dfv.value[mid][0] < time: + return aux(mid, j) + else: + return dfm.value[mid] + + return aux(0, len(dfm.value)-1) + +def increment_reserv_bucket(bucket, ceils, reserv, last_term_by_id): + idx = 0 + while idx < len(ceils) and ceils[idx] < reserv: + idx += 1 + + bucket[idx] += 1 + +def for_each_joined(x): + task_id = x[0] + ts = x[1] + + term = -1 + ts = sorted(ts, key=lambda x: x.time) + + request_ceils = [0.025, 0.05, 0.075] + cpu_request = [0] * 4 # [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075 + ram_request = [0] * 4 # [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075 + + reserv_ceils = [0, 0.2, 0.4, 0.6, 0.8, 1] + cpu_reservs = [0] * 7 # [n, a, b, c, d, e, f] where: + ram_reservs = [0] * 7 + + for i, t in enumerate(ts): + machine_log = get_machine_time_resources(mid, t.time) + if machine_log is not None: + reserv_cpu = tot_req[0] / machine_logs[0][2] + reserv_ram = tot_req[1] / machine_logs[0][3] + else: + reserv_cpu = -1 + reserv_ram = -1 + # 8a-b + increment_reserv_bucket(cpu_request, request_ceils, t.id, t.rcpu) + increment_reserv_bucket(ram_request, request_ceils, t.id, t.rram) + # 8c-d + increment_reserv_bucket(cpu_reservs, reserv_ceils, t.id, reserv_cpu) + increment_reserv_bucket(ram_reservs, reserv_ceils, t.id, reserv_ram) + + if t.type >= 4 and t.type <= 8: + term = t.type + + res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None} + res[term] = {'rcpu': cpu_request, 'rram': ram_request, 'rscpu': cpu_reservs, 'rsram': ram_reservs} + + return res + +def fold_resobjs(ro1, ro2): + if ro1 is None: + return ro2 + elif ro2 is None: + return ro1 + else: + for k in ro1.keys(): + for kk in ro1[k].keys(): + if ro1[k][kk] is None: + ro1[k][kk] = ro2[k][kk] + elif ro2[k][kk] is None: + continue + else: + ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])] + return ro1 + +result = df.rdd \ + .groupBy(lambda x: x.id) \ + .map(for_each_joined) \ + .fold(None, fold_resobjs) + +d = os.path.dirname(os.path.realpath(__file__)) + +with open(d + "/" + cluster + "_figure8abcd.json", "w") as f: + json.dump(result, f) + +# vim: set ts=4 sw=4 et tw=120: