#!/usr/bin/env python3 # coding: utf-8 import os import json import pandas as pd import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import * from decimal import * if len(sys.argv) is not 5: print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}") sys.exit() cluster=sys.argv[1] spark = pyspark.sql.SparkSession.builder \ .appName("task_slowdown") \ .config("spark.driver.maxResultSize", "128g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext # READING INSTANCE EVENTS DATA dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz" #dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json" df = spark.read.json(dfepath) #df = df.limit(10000) def tabid(x): return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64) def increment_reserv_bucket(bucket, value): if value < 0: idx = 0 else: idx = 40 if value >= 1 else (int(value * 40) + 1) bucket[idx] += 1 def for_each_joined(var): task_id = var[0] ts = var[1] term = -1 ts = sorted(ts, key=lambda x: -1 if x["time"] is None else x["time"]) cpu_request = [0] * 42 ram_request = [0] * 42 cut = False for i in range(0, len(ts)): if ts[i]["time"] is not None: cut = True ts = ts[i:len(ts)] cpu_request[41] = i ram_request[41] = i break if not cut: raise Exception('all times are none') for i, t in enumerate(ts): increment_reserv_bucket(cpu_request, t["rcpu"]) increment_reserv_bucket(ram_request, t["rram"]) if t["type"] >= 4 and t["type"] <= 8: term = t["type"] res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None} res[term] = {'rcpu': cpu_request, 'rram': ram_request} return res def fold_resobjs(ro1, ro2): if ro1 is None: return ro2 elif ro2 is None: return ro1 else: for k in ro1.keys(): if ro1[k] is None: ro1[k] = ro2[k] elif ro2[k] is not None: for kk in ro1[k].keys(): if ro1[k][kk] is None: ro1[k][kk] = ro2[k][kk] elif ro2[k][kk] is not None: ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])] return ro1 result = df.rdd \ .filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and x.resource_request.cpus is not None and x.resource_request.memory is not None) \ .map(lambda x: {"id": tabid(x), "time": int(x.time), "type": int(x.type), "rcpu": Decimal(x.resource_request.cpus), "rram": Decimal(x.resource_request.memory), "mid": x.machine_id}) \ .groupBy(lambda x: x["id"]) \ .map(for_each_joined) \ .fold(None, fold_resobjs) d = os.path.dirname(os.path.realpath(__file__)) with open(d + "/" + cluster + "_figure8ab.json", "w") as f: json.dump(result, f) # vim: set ts=4 sw=4 et tw=120: