#!/usr/bin/env python3 # coding: utf-8 # # Temporal impact: machine time waste # This analysis is meant to analyse the time spend by instance events doing submission, queueing, and execution. This # preliminary script sums the total time spent by instance executions to transition from each event type to another. # Additionaly, time sums are partitioned by the last termination state of the instance they belong (i.e. the last # 4<=x<=8 event type for that instance). # Please note that events with either missing time, type, instance_index or collection_id are ignored. Total number of # instance events in the trace and filtered number of events are saved in the output. # ## Data representation # Total and filtered totals mentioned before are under "total" and "filtered" attributes in the root of the generated # JSON object. The "data" atrribute is a list of pairs of final instance termination states and the corresponding list # of time totals per each transition. Each transition total is represented in the form of "x-y" where x is the last # event type prior to the transition and "y" is the new event detected. Times are calculated by summing all event times # "y" subtracting the nearest event of type "x" for each instance. If an event "x" is repeated multiple times # immediately after an event of the same type, only the first event in chronological order is considered. If however # after multiple repetitions of the event "x" the trace for that instance terminates, an "x-x" time sum is registered by # computing the difference between the last and the first event of "x" type. Times are represented in microseconds. import json import pandas from IPython import display import findspark findspark.init() import pyspark import pyspark.sql import sys from pyspark.sql.functions import lit from pyspark.sql.types import ByteType if len(sys.argv) != 2 or len(sys.argv[1]) != 1: print("usage: " + sys.argv[0] + " {cluster}", file=sys.stderr) sys.exit(1) cluster=sys.argv[1] spark = pyspark.sql.SparkSession.builder \ .appName("machine_time_waste") \ .config("spark.local.dir", "/run/tmpfiles.d/spark") \ .config("spark.driver.memory", "124g") \ .getOrCreate() sc = spark.sparkContext #df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json") \ df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz") try: df["collection_type"] = df["collection_type"].cast(ByteType()) except: df = df.withColumn("collection_type", lit(None).cast(ByteType())) df.printSchema() df.show() total = sc.accumulator(0) filtered = sc.accumulator(1) def for_each_task(ts): ts = sorted(ts, key=lambda x: x["time"]) last_term = None prev = None tr = {} for i,t in enumerate(ts): if prev is not None and t["type"] == prev["type"]: # remove useless transitions if i == len(ts) - 1: # if last tr[str(prev["type"]) + "-" + str(t["type"])] = t["time"] - prev["time"] # keep "loops" if last else: continue if t["type"] >= 4 and t["type"] <= 8: last_term = t["type"] if prev is not None: tr[str(prev["type"]) + "-" + str(t["type"])] = t["time"] - prev["time"] prev = t return {"last_term": last_term, 'tr': tr} def sum_values(ds): dsum = {} for dt in ds: d = dt["tr"] for key in d: if key not in dsum: dsum[key] = d[key] else: dsum[key] += d[key] return dsum def count_total(x): total.add(1) return x def cleanup(x): filtered.add(1) return { "time": int(x.time), "type": 0 if x.type is None else int(x.type), "id": x.collection_id + "-" + x.instance_index, } r2 = df.rdd \ .filter(lambda x: x.collection_type is None or x.collection_type == 0) \ .map(count_total) \ .filter(lambda x: x.type is not None and x.time is not None and x.instance_index is not None and x.collection_id is not None) \ .map(cleanup) \ .groupBy(lambda x: x["id"]) \ .mapValues(for_each_task) \ .map(lambda x: x[1]) \ .groupBy(lambda x: x["last_term"]) \ .mapValues(sum_values) \ .collect() with open(cluster + "_state_changes_jobs.json", "w") as out: json.dump({"filtered": filtered.value, "total": total.value, "data": r2}, out) # vim: set ts=2 sw=2 et tw=120: