#!/usr/bin/env python3 # coding: utf-8 import json import pandas from IPython import display import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import ByteType cluster=sys.argv[1] spark = pyspark.sql.SparkSession.builder \ .appName("task_slowdown") \ .config("spark.driver.maxResultSize", "32g") \ .config("spark.local.dir", "/run/tmpfiles.d/spark") \ .config("spark.driver.memory", "75g") \ .getOrCreate() sc = spark.sparkContext df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz") #df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json") try: df["collection_type"] = df["collection_type"].cast(ByteType()) except: df = df.withColumn("collection_type", lit(None).cast(ByteType())) class NonPriorityAcc(pyspark.AccumulatorParam): def zero(self, value): return {} def addInPlace(self, v1, v2): for key in v2: if key in v1: v1[key] += v2[key] else: v1[key] = v2[key] return v1 non = sc.accumulator({}, NonPriorityAcc()) def for_each_task(ts): global non ts = sorted(ts, key=lambda x: x["time"]) last_term = None priority = -1 responding = False resp_burst_start = None resp_burst_type = None resp_time = [] resp_time_last = 0 for i,t in enumerate(ts): if t["priority"] is not -1 and priority is -1: priority = t["priority"] #if responding: # resp_burst_type.append(t["type"]) if t["type"] >= 4 and t["type"] <= 8: last_term = t["type"] if responding: resp_burst_type.append(t["type"]) # This response time interval has ended, so record the time delta and term resp_time.append((t["time"] - resp_burst_start, resp_burst_type)) responding = False if (not responding) and (t["type"] < 4 or t["type"] > 8): resp_burst_start = t["time"] resp_burst_type = [t["type"]] responding = True if last_term != 6: non.add({priority: 1}) return (priority, resp_time) if last_term == 6 else None def cleanup(x): return { "time": int(x.time), "type": 0 if x.type is None else int(x.type), "id": x.collection_id + "-" + x.instance_index, "priority": -1 if x.priority is None else int(x.priority) } df2 = df.rdd \ .filter(lambda x: x.collection_type is None or x.collection_type == 0) \ .filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \ .map(cleanup) \ .groupBy(lambda x: x["id"]) \ .mapValues(for_each_task) \ .filter(lambda x: x[1] is not None) \ .map(lambda x: x[1]) \ .groupBy(lambda x: x[0]) \ .mapValues(lambda x: [e[1] for e in x]) val = df2.collect() val2 = {} for e in val: val2[e[0]] = e[1] a = {"val": val2, "non": non.value} with gzip.open(cluster + "_state_changes.json.gz", "wt") as out: json.dump(a, out, separators=(',', ':'))