#!/usr/bin/env python3 # coding: utf-8 import os import json import pandas as pd import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import * from decimal import * if len(sys.argv) is not 5: print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {joindir}") sys.exit() joindir=sys.argv[4] cluster=sys.argv[1] spark = pyspark.sql.SparkSession.builder \ .appName("task_slowdown") \ .config("spark.driver.maxResultSize", sys.argv[3]) \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext df = spark.read.parquet(joindir + "/figure-8-join-" + cluster + ".parquet") # READING MACHINE EVENTS DATA, sort them and save them as broadcast variable print("Starting to read machine events...") dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={ 'time': lambda x: -1 if x == '' else int(x), 'machine_id': lambda x: str(x), 'capacity.cpus': lambda x: -1 if x == '' else Decimal(x), 'capacity.memory': lambda x: -1 if x == '' else Decimal(x)}) print("Dropping remove events...") dfm = dfm[(dfm.type!=2)&(dfm.time!=-1)&(dfm["capacity.cpus"]!=-1)&(dfm["capacity.memory"]!=-1)] print("Dropping missing data events...") dfm = dfm[dfm.missing_data_reason.isnull()] print("Projecting on useful columns...") dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]] print("Sorting by time...") dfm = dfm.sort_values(by=["machine_id", "time"]) print("Converting to broadcast variable...") dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()]) print("Done with machine events.") def get_machine_time_resources(machine_id, time): def aux(i, j): if i == j: return dfm.value[i] if dfm.value[i][1] == machine_id else None elif i + 1 == j: if dfm.value[i][1] == machine_id: return dfm.value[i] elif dfm.value[j][1] == machine_id: return dfm.value[j] else: return None mid = (i + j) // 2 if dfm.value[mid][1] > machine_id: return aux(i, mid - 1) elif dfm.value[mid][1] < machine_id: return aux(mid + 1, j) elif dfm.value[mid][0] > time: return aux(i, mid) elif dfv.value[mid][0] < time: return aux(mid, j) else: return dfm.value[mid] return aux(0, len(dfm.value)-1) def increment_reserv_bucket(bucket, value): if value < 0: idx = 0 else: idx = 40 if value >= 1 else (int(value * 40) + 1) bucket[idx] += 1 def for_each_joined(x): task_id = x[0] ts = x[1] term = -1 ts = sorted(ts, key=lambda x: x.time) cpu_util = [0] * 41 ram_util = [0] * 41 cpu_request = [0] * 41 ram_request = [0] * 41 for i, t in enumerate(ts): machine_log = get_machine_time_resources(mid, t.time) if machine_log is not None: util_cpu = t.acpu / machine_log[2] util_ram = t.aram / machine_log[3] else: util_cpu = -1 util_ram = -1 # 8a-b increment_reserv_bucket(cpu_request, t.rcpu) increment_reserv_bucket(ram_request, t.rram) # 8e-f increment_reserv_bucket(cpu_util, t.acpu) increment_reserv_bucket(ram_util, t.aram) if t.type >= 4 and t.type <= 8: term = t.type res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None} res[term] = {'rcpu': cpu_request, 'rram': ram_request, 'ucpu': cpu_util, 'uram': ram_util} return res def fold_resobjs(ro1, ro2): if ro1 is None: return ro2 elif ro2 is None: return ro1 else: for k in ro1.keys(): for kk in ro1[k].keys(): if ro1[k][kk] is None: ro1[k][kk] = ro2[k][kk] elif ro2[k][kk] is None: continue else: ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])] return ro1 result = df.rdd \ .groupBy(lambda x: x.id) \ .map(for_each_joined) \ .fold(None, fold_resobjs) d = os.path.dirname(os.path.realpath(__file__)) with open(d + "/" + cluster + "_figure8abcd.json", "w") as f: json.dump(result, f) # vim: set ts=4 sw=4 et tw=120: