#!/usr/bin/env python3 # coding: utf-8 # # Temporal impact: machine time waste import json import pandas from IPython import display import findspark findspark.init() import pyspark import pyspark.sql import sys from pyspark.sql.functions import col, lag, when, concat_ws, last, first from pyspark.sql import Window from pyspark.sql.types import LongType if len(sys.argv) != 2 or len(sys.argv[1]) != 1: print("usage: " + sys.argv[0] + " {cluster}", file=sys.stderr) sys.exit(1) cluster=sys.argv[1] spark = pyspark.sql.SparkSession.builder \ .appName("machine_time_waste") \ .config("spark.local.dir", "/tmp/ramdisk/spark") \ .config("spark.driver.memory", "124g") \ .getOrCreate() df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz") # df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json") df.printSchema() df.show() # .filter(df.collection_type == 0) \ df2 = df \ .withColumn("time", col("time").cast(LongType())) \ .withColumn("type", col("type").cast(LongType())) \ .withColumn("type", when(col("type").isNull(), 0).otherwise(col("type"))) \ .withColumn("id", concat_ws("-", "collection_id", "instance_index")) \ .where(col("time").isNotNull()) \ .where(col("type").isNotNull()) \ .where((col("instance_index").isNotNull()) & (col("collection_id").isNotNull())) \ .select("time", "type", "id") df2.show() total = df.count() filtered = df2.count() print("Total: " + str(total)) print("Filtered: " + str(filtered)) r = df2.rdd def for_each_task(ts): ts = sorted(ts, key=lambda x: x.time) last_term = None prev = None tr = {} for i,t in enumerate(ts): if prev is not None and t.type == prev.type: # remove useless transitions if (i == len(ts)-1): # if last tr[str(prev.type) + "-" + str(t.type)] = t.time - prev.time # keep "loops" if last else: continue if t.type >= 4 and t.type <= 8: last_term = t.type if prev is not None: tr[str(prev.type) + "-" + str(t.type)] = t.time - prev.time prev = t return {"last_term": last_term, 'tr': tr} def sum_values(ds): dsum = {} for dt in ds: d = dt["tr"] for key in d: if key not in dsum: dsum[key] = d[key] else: dsum[key] += d[key] return dsum r2 = r \ .groupBy(lambda x: x.id) \ .mapValues(for_each_task) \ .map(lambda x: x[1]) \ .groupBy(lambda x: x["last_term"]) \ .mapValues(sum_values) \ .collect() with open(cluster + "_state_changes.json", "w") as out: json.dump({"filtered": filtered, "total": total, "data": r2}, out) # .withColumn("prev_time", lag(df2.time).over(my_window)) \ # .withColumn("prev_type", lag(df2.type).over(my_window)) \ # vim: set ts=2 sw=2 et tw=120: