#!/usr/bin/env python3 # coding: utf-8 import json import pandas from IPython import display import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import ByteType cluster=sys.argv[1] spark = pyspark.sql.SparkSession.builder \ .appName("task_slowdown") \ .config("spark.driver.maxResultSize", "32g") \ .config("spark.local.dir", "/run/tmpfiles.d/spark") \ .config("spark.driver.memory", "75g") \ .getOrCreate() sc = spark.sparkContext df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz") #df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json") try: df["collection_type"] = df["collection_type"].cast(ByteType()) except: df = df.withColumn("collection_type", lit(None).cast(ByteType())) RUN = set([(3,1), (3,4), (3,5), (3,6), (3,7), (3,8), (3,10), (10,1), (10,4), (10,5), (10,6), (10,7), (10,8), (10,10)]) def is_res_none(tres): return tres is None or tres["cpus"] is None or tres["memory"] is None def for_each_task(ts): ts = sorted(ts, key=lambda x: x["time"]) last_term = None last_resources = None prev = None cpu = 0 ram = 0 for i,t in enumerate(ts): if t["type"] >= 4 and t["type"] <= 8: last_term = t["type"] if prev is not None: if (prev["type"], t["type"]) in RUN: if is_res_none(last_resources): last_resources = t["res"] if not is_res_none(last_resources): delta = t["time"] - prev["time"] cpu += round(delta * last_resources["cpus"]) ram += round(delta * last_resources["memory"]) prev = t if not is_res_none(last_resources): last_resources = t["res"] return [("cpu-" + str(last_term), cpu), ("ram-" + str(last_term), ram)] def cleanup(x): return { "time": int(x.time), "type": 0 if x.type is None else int(x.type), "id": x.collection_id + "-" + x.instance_index, "res": x.resource_request } df2 = df.rdd \ .filter(lambda x: x.collection_type is None or x.collection_type == 0) \ .filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \ .map(cleanup) \ .groupBy(lambda x: x["id"]) \ .mapValues(for_each_task) \ .flatMap(lambda x: x[1]) \ .groupBy(lambda x: x[0]) \ .mapValues(lambda xs: sum(n for _, n in xs)) \ .collect() result = {} for pair in df2: result[pair[0]] = pair[1] with open(cluster + "_res_micros_requested.json", "w") as out: json.dump(result, out, separators=(',', ':'))