bachelorThesis/machine_time_waste/machine_time_waste_inefficient.py

81 lines
3.3 KiB
Python
Executable file

#!/usr/bin/env python3
# coding: utf-8
# # Temporal impact: machine time waste
# Inefficient version of the machine_time_waste.py script in the same dir. Please ignore, kept for future need
import pandas
from IPython import display
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
from pyspark.sql.functions import col, lag, when, concat_ws, last, first
from pyspark.sql import Window
from pyspark.sql.types import LongType
cluster="b"
spark = pyspark.sql.SparkSession.builder \
.appName("machine_time_waste") \
.config("spark.local.dir", "/run/tmpfiles.d/spark") \
.config("spark.driver.memory", "124g") \
.getOrCreate()
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
df.printSchema()
df.show()
# .filter(df.collection_type == 0) \
df2 = df \
.withColumn("time", col("time").cast(LongType())) \
.withColumn("type", col("type").cast(LongType())) \
.withColumn("type", when(col("type").isNull(), 0).otherwise(col("type"))) \
.withColumn("id", concat_ws("-", "collection_id", "instance_index")) \
.where(col("time").isNotNull()) \
.where(col("type").isNotNull()) \
.where((col("instance_index").isNotNull()) & (col("collection_id").isNotNull())) \
.select("time", "type", "id")
df2.show()
print("Total: " + str(df.count()))
print("Filtered: " + str(df2.count()))
# my_window = Window.partitionBy("machine_id", "id").orderBy(df2.time.asc())
w2 = Window.partitionBy("id").orderBy(df2.time.asc()).rowsBetween(Window.currentRow, Window.unboundedFollowing)
# .withColumn("prev_time", lag(df2.time).over(my_window)) \
# .withColumn("prev_type", lag(df2.type).over(my_window)) \
df3 = df2 \
.withColumn("t3_time", when((df2.type != 3), None).otherwise(df2.time)) \
.withColumn("t45678_time", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.time)) \
.withColumn("t45678_type", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.type)) \
.withColumn("t01_time", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.time)) \
.withColumn("t01_type", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.type)) \
.withColumn("next_time", when(df2.type == 3, first(col("t45678_time"), True).over(w2)) \
.when((df2.type == 0) | (df2.type == 1), first(col("t3_time"), True).over(w2)) \
.when((df2.type >= 4) | (df2.type <= 8), first(col("t01_time"), True).over(w2)) \
.otherwise(None)) \
.withColumn("next_type", when(df2.type == 3, first(col("t45678_type"), True).over(w2)) \
.when((df2.type == 0) | (df2.type == 1), 3) \
.when((df2.type >= 4) | (df2.type <= 8), first(col("t01_type"), True).over(w2)) \
.otherwise(None)) \
.withColumn("last_term_type", last(col("t45678_type"), True).over(w2)) \
.withColumn("time_delta", col("next_time") - col("time")) \
.select("id", "time", "type", "last_term_type", "time_delta", "t01_time", \
"t01_type", "t3_time", "t45678_time", "t45678_type", "next_time", "next_type")
df4 = df3.where(df3.next_type.isNotNull()).groupby("type", "next_type", "last_term_type").sum("time_delta")
# df3.orderBy(df3.machine_id, df3.time).show(n=100)
# df3.printSchema()
df4.show(n=1000000)
df4.write.csv("/home/claudio/google_2019/thesis_queries/machine_time_waste/" + cluster + "_state_change.csv")
# vim: set ts=2 sw=2 et tw=120: