#!/usr/bin/env python3 # coding: utf-8 import os import json import pandas as pd import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import * from decimal import * import random CHECKDIR = "/home/claudio/google_2019/thesis_queries/figure_7/" if len(sys.argv) is not 4: print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") sys.exit() cluster=sys.argv[1] if os.path.exists(CHECKDIR + cluster + "_figure7c.csv"): print("already computed") sys.exit() if os.path.exists(CHECKDIR + cluster + "_figure7c_working"): print("already in execution") sys.exit() os.system("touch " + CHECKDIR + cluster + "_figure7c_working") spark = pyspark.sql.SparkSession.builder \ .appName("task_slowdown") \ .config("spark.driver.maxResultSize", "128g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext # READING INSTANCE EVENTS DATA dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz" #dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events00000000000?.json.gz" df = spark.read.json(dfepath) def tabid(x): return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64) def bucket_sum_per_termination(bucket, last_term_by_id): result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None} for tid, vs in bucket.items(): term = last_term_by_id[tid] if result[term] is None: result[term] = vs else: result[term] = [sum(x) for x in zip(result[term], vs)] return result def tally_event(bucket, term, nexec): if term not in bucket: bucket[term] = {} if nexec not in bucket[term]: bucket[term][nexec] = 0 bucket[term][nexec] += 1 def for_each_joined(x): machine_id = x[0] if x[0] is None: return {} ts = x[1] ts = sorted(ts, key=lambda x: x["time"] or -1) in_execution = set() chum = {} for t in ts: in_execution.add(t["id"]) tally_event(chum, t["term"], len(in_execution)) if t["end"]: in_execution.remove(t["id"]) return chum def fold_resobjs(ro1, ro2): if not ro1.keys(): return ro2 elif ro2.keys(): for k in set(ro1.keys()).union(set(ro2.keys())): if k not in ro1: ro1[k] = ro2[k] elif k in ro2: for kk in set(ro1[k].keys()).union(set(ro2[k].keys())): if kk not in ro1[k]: ro1[k][kk] = ro2[k][kk] elif kk in ro2[k]: ro1[k][kk] += ro2[k][kk] return ro1 def mark_next(data): ts = list(data[1]) ts = sorted(ts, key=lambda z: z[1] or -1) last_term = -1 for i in range(0, len(ts)): t = ts[i] ts[i] = {"id": t[0], "time": t[1], "type": t[2], "mid": t[3], "end": (i == len(ts) -1 or t[3] != ts[i+1][3])} if ts[i]["type"] >= 4 and ts[i]["type"] <= 8: last_term = ts[i]["type"] for t in ts: t["term"] = last_term return ts def to_csv(result): out = "term,n_exec,count\n" for key in result.keys(): for key2 in result[key].keys(): out += str(key) + "," + str(key2) + "," + str(result[key][key2]) + "\n" return out result = df.rdd \ .filter(lambda x: x.time is not None and x.type is not None and x.instance_index is not None and x.collection_id is not None) \ .map(lambda x: [tabid(x), int(x.time), int(x.type), x.machine_id]) \ .groupBy(lambda x: x[0]) \ .flatMap(mark_next) \ .filter(lambda x: x["mid"] is not None) \ .groupBy(lambda x: x["mid"]) \ .partitionBy(1000, lambda x: random.randint(0, 1000-1)) \ .map(for_each_joined) \ .fold({}, fold_resobjs) d = os.path.dirname(os.path.realpath(__file__)) with open(d + "/" + cluster + "_figure7c.csv", "w") as f: f.write(to_csv(result)) os.system("rm " + CHECKDIR + cluster + "_figure7c_working") # vim: set ts=4 sw=4 et tw=120: