#!/usr/bin/env python3 # coding: utf-8 # # Temporal impact: machine time waste # Inefficient version of the machine_time_waste.py script in the same dir. Please ignore, kept for future need import pandas from IPython import display import findspark findspark.init() import pyspark import pyspark.sql import sys from pyspark.sql.functions import col, lag, when, concat_ws, last, first from pyspark.sql import Window from pyspark.sql.types import LongType cluster="b" spark = pyspark.sql.SparkSession.builder \ .appName("machine_time_waste") \ .config("spark.local.dir", "/run/tmpfiles.d/spark") \ .config("spark.driver.memory", "124g") \ .getOrCreate() df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz") df.printSchema() df.show() # .filter(df.collection_type == 0) \ df2 = df \ .withColumn("time", col("time").cast(LongType())) \ .withColumn("type", col("type").cast(LongType())) \ .withColumn("type", when(col("type").isNull(), 0).otherwise(col("type"))) \ .withColumn("id", concat_ws("-", "collection_id", "instance_index")) \ .where(col("time").isNotNull()) \ .where(col("type").isNotNull()) \ .where((col("instance_index").isNotNull()) & (col("collection_id").isNotNull())) \ .select("time", "type", "id") df2.show() print("Total: " + str(df.count())) print("Filtered: " + str(df2.count())) # my_window = Window.partitionBy("machine_id", "id").orderBy(df2.time.asc()) w2 = Window.partitionBy("id").orderBy(df2.time.asc()).rowsBetween(Window.currentRow, Window.unboundedFollowing) # .withColumn("prev_time", lag(df2.time).over(my_window)) \ # .withColumn("prev_type", lag(df2.type).over(my_window)) \ df3 = df2 \ .withColumn("t3_time", when((df2.type != 3), None).otherwise(df2.time)) \ .withColumn("t45678_time", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.time)) \ .withColumn("t45678_type", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.type)) \ .withColumn("t01_time", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.time)) \ .withColumn("t01_type", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.type)) \ .withColumn("next_time", when(df2.type == 3, first(col("t45678_time"), True).over(w2)) \ .when((df2.type == 0) | (df2.type == 1), first(col("t3_time"), True).over(w2)) \ .when((df2.type >= 4) | (df2.type <= 8), first(col("t01_time"), True).over(w2)) \ .otherwise(None)) \ .withColumn("next_type", when(df2.type == 3, first(col("t45678_type"), True).over(w2)) \ .when((df2.type == 0) | (df2.type == 1), 3) \ .when((df2.type >= 4) | (df2.type <= 8), first(col("t01_type"), True).over(w2)) \ .otherwise(None)) \ .withColumn("last_term_type", last(col("t45678_type"), True).over(w2)) \ .withColumn("time_delta", col("next_time") - col("time")) \ .select("id", "time", "type", "last_term_type", "time_delta", "t01_time", \ "t01_type", "t3_time", "t45678_time", "t45678_type", "next_time", "next_type") df4 = df3.where(df3.next_type.isNotNull()).groupby("type", "next_type", "last_term_type").sum("time_delta") # df3.orderBy(df3.machine_id, df3.time).show(n=100) # df3.printSchema() df4.show(n=1000000) df4.write.csv("/home/claudio/google_2019/thesis_queries/machine_time_waste/" + cluster + "_state_change.csv") # vim: set ts=2 sw=2 et tw=120: