#!/usr/bin/env python3 # coding: utf-8 import os import json import pandas as pd import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import * from decimal import * TESTDATA = True if len(sys.argv) is not 4: print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") sys.exit() cluster=sys.argv[1] spark = pyspark.sql.SparkSession.builder \ .appName("task_slowdown") \ .config("spark.driver.maxResultSize", "128g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext # # READING INSTANCE EVENTS DATA # dfepath = "/home/claudio/google_2019/instance_events/" + cluster dfepath += "/" + cluster + ("_instance_events00000000000?.json.gz" if TESTDATA else "_instance_events*.json.gz") dfe = spark.read.json(dfepath) try: dfe["collection_type"] = dfe["collection_type"].cast(ByteType()) except: dfe = dfe.withColumn("collection_type", lit(None).cast(ByteType())) # # READING INSTANCE USAGE DATA # dfupath = "/home/claudio/google_2019/instance_usage/" + cluster dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.csv.gz") usage_schema = StructType() \ .add("start_time", LongType(), True) \ .add("end_time", LongType(), True) \ .add("collection_id", StringType(), True) \ .add("instance_index", StringType(), True) \ .add("machine_id", StringType(), True) \ .add("alloc_collection_id", LongType(), True) \ .add("alloc_instance_index", StringType(), True) \ .add("collection_type", ByteType(), True) \ .add("average_usage_cpus", DoubleType(), True) \ .add("average_usage_memory", DoubleType(), True) \ .add("maximum_usage_cpus", DoubleType(), True) \ .add("maximum_usage_memory", DoubleType(), True) \ .add("random_sample_usage_cpus", DoubleType(), True) \ .add("random_sample_usage_memory", DoubleType(), True) \ .add("assigned_memory", DoubleType(), True) \ .add("page_cache_memory", DoubleType(), True) \ .add("cycles_per_instruction", DoubleType(), True) \ .add("memory_accLesses_per_instruction", DoubleType(), True) \ .add("sample_rate", DoubleType(), True) \ .add("cpu_usage_dist_00", DoubleType(), True) \ .add("cpu_usage_dist_10", DoubleType(), True) \ .add("cpu_usage_dist_20", DoubleType(), True) \ .add("cpu_usage_dist_30", DoubleType(), True) \ .add("cpu_usage_dist_40", DoubleType(), True) \ .add("cpu_usage_dist_50", DoubleType(), True) \ .add("cpu_usage_dist_60", DoubleType(), True) \ .add("cpu_usage_dist_70", DoubleType(), True) \ .add("cpu_usage_dist_80", DoubleType(), True) \ .add("cpu_usage_dist_90", DoubleType(), True) \ .add("cpu_usage_dist_91", DoubleType(), True) \ .add("cpu_usage_dist_92", DoubleType(), True) \ .add("cpu_usage_dist_93", DoubleType(), True) \ .add("cpu_usage_dist_94", DoubleType(), True) \ .add("cpu_usage_dist_95", DoubleType(), True) \ .add("cpu_usage_dist_96", DoubleType(), True) \ .add("cpu_usage_dist_97", DoubleType(), True) \ .add("cpu_usage_dist_98", DoubleType(), True) \ .add("cpu_usage_dist_99", DoubleType(), True) dfu = spark.read.format("csv") \ .option("header", False) \ .schema(usage_schema) \ .load(dfupath) # READING MACHINE EVENTS DATA, sort them and save them as broadcast variable print("Starting to read machine events...") dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv") print("Dropping remove events...") dfm = dfm[(dfm.type==1)|(dfm.type==3)] print("Dropping missing data events...") dfm = dfm[dfm.missing_data_reason.notnull()] print("Projecting on useful columns...") dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]] print("Sorting by time...") dfm = dfm.sort_values(by=["machine_id", "time"]) print("Converting to broadcast variable...") dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()]) print("Done with machine events.") def tabid(x): return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64) # interpolate machine data by extending each last machine report before a gap to cover it def clean_usage(x): return [tabid(x), Decimal(x.average_usage_cpus), Decimal(x.average_usage_memory), int(x.start_time), int(x.end_time), x.machine_id] def interpolate_usage(ts): ts = sorted(ts, key=lambda x: x[3]) l = len(ts) for i in range(1, l-1): if ts[i+1][3] > ts[i][4]: ts[i][4] = ts[i+1][3] return ts dfu = dfu.rdd \ .filter(lambda x: x.start_time is not None and x.end_time is not None and x.instance_index is not None and x.collection_id is not None and x.machine_id is not None) \ .map(clean_usage).groupBy(lambda x: x[0]) \ .flatMap(lambda x: interpolate_usage(x[1])).toDF(["id", "acpu", "aram", "start", "end", "mid"]) dfe = dfe.rdd \ .filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and x.resource_request.cpus is not None and x.resource_request.memory is not None) \ .map(lambda x: [tabid(x), int(x.time), int(x.type), Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \ .toDF(["id", "time", "type", "rcpu", "rram", "mid"]) df = dfe.join(dfu, [dfe.id == dfu.id, dfe.mid == dfu.mid, dfe.time >= dfu.start, dfe.time < dfu.end]) def get_machine_time_resources(machine_id): def aux(i, j): mid = (i + j) // 2 print(i, j, mid) print(dfm.value[mid]) if dfm.value[mid][1] > machine_id: return aux(i, mid - 1) elif dfm.value[mid][1] < machine_id: return aux(mid + 1, j) else: start = mid while dfm.value[start][1] == machine_id and start >= 0: # once found, search for oldest log for machine start -= 1 start += 1 end = mid while dfm.value[end][1] == machine_id and end < len(dfm.value): end += 1 return dfm.value[start:end] return aux(0, len(dfm.value)-1) def increment_reserv_bucket(bucket, ceils, taskid, reserv): idx = 0 while ceils[idx] < reserv and idx < len(ceils): idx += 1 if taskid not in bucket: bucket[taskid] = [0] * (len(ceils) + 1) bucket[taskid][idx] += 1 def bucket_sum_per_termination(bucket, last_term_by_id): result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None} for tid, vs in bucket.items(): term = last_term_by_id[tid] if result[term] is None: result[term] = vs else: result[term] = [sum(x) for x in zip(result[term], vs)] return result def for_each_joined(x): machine_id = x[0] ts = x[1] ts = sorted(ts, key=lambda x: x.time) last_req_by_id = {} # map taskid -> last known req [cpu, ram] (data removed when task terminates) reserv_ceils = [0.2, 0.4, 0.6, 0.8, 1] cpu_reservs_by_id = {} # map taskid -> [a, b, c, d, e, f] where: # a: count of event with res. reserv. <0.2 # b: count of event with res. reserv. [0.2, 0.4) # c: count of event with res. reserv. [0.4, 0.6) # d: count of event with res. reserv. [0.6, 0.8) # e: count of event with res. reserv. [0.8, 0.1) # f: count of event with res. reserv. >=1 ram_reservs_by_id = {} request_ceils = [0.025, 0.05, 0.075] cpu_request_by_id = {} # map taskid -> [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075 ram_request_by_id = {} util_ceils = reserv_ceils cpu_util_by_id = {} ram_util_by_id = {} last_term_by_id = {} # map taskid -> last termination machine_logs = get_machine_time_resources(machine_id) for i, t in enumerate(ts): if len(machine_logs) > 1 and machine_logs[1][0] >= t.time: machine_logs.pop(0) if t.id not in last_term_by_id: last_term_by_id[t.id] = -1 if t.type >= 4 and t.type <= 8: last_term_by_id[t.id] = t.type del last_req_by_id[t.id] else: if t.rcpu is not None and t.rram is not None: last_req_by_id[t.id] = (t.rcpu, t.rram) # 8b tot_req = [sum(x) for x in zip(*last_req_by_id.values())] reserv_cpu = tot_req[0] / machine_logs[0][2] reserv_ram = tot_req[1] / machine_logs[0][3] increment_reserv_bucket(cpu_reservs_by_id, reserv_ceils, t.id, reserv_cpu) increment_reserv_bucket(ram_reservs_by_id, reserv_ceils, t.id, reserv_ram) # 8a increment_reserv_bucket(cpu_request_by_id, request_ceils, t.id, t.rcpu) increment_reserv_bucket(ram_request_by_id, request_ceils, t.id, t.rram) # 8c increment_reserv_bucket(cpu_util_by_id, util_ceils, t.id, t.acpu) increment_reserv_bucket(ram_util_by_id, util_ceils, t.id, t.aram) resobj = {'rcpu': cpu_request_by_id, 'rram': ram_request_by_id, 'rscpu': cpu_reservs_by_id, 'rsram': ram_reservs_by_id, 'ucpu': cpu_util_by_id, 'uram': ram_util_by_id} for k, v in resobj.items(): resobj[k] = bucket_sum_per_termination(v, last_term_by_id) return resobj def fold_resobjs(ro1, ro2): if ro1 is None: return ro2 elif ro2 is None: return ro1 else: for k in ro1.keys(): for kk in ro1[k].keys(): if ro1[k][kk] is None: ro1[k][kk] = ro2[k][kk] elif ro2[k][kk] is None: continue else: ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])] return ro1 # TODO: partition by id and in the for-each-row # function implement lookup to dfm.value to understand its memory capacity result = df.rdd \ .groupBy(lambda x: x.mid) \ .map(for_each_joined) \ .fold(None, fold_resobjs) d = os.path.dirname(os.path.realpath(__file__)) with open(d + "/" + cluster + "_figure8.json", "w") as f: json.dump(result, f) # vim: set ts=4 sw=4 et tw=120: