Tidied machine_time_waste analysis

This commit is contained in:
Claudio Maggioni 2021-02-22 13:12:38 +00:00
parent 3613193997
commit 7cfebd2409
4 changed files with 153 additions and 134 deletions

View File

@ -3,6 +3,24 @@
# # Temporal impact: machine time waste
# This analysis is meant to analyse the time spend by instance events doing submission, queueing, and execution. This
# preliminary script sums the total time spent by instance executions to transition from each event type to another.
# Additionaly, time sums are partitioned by the last termination state of the instance they belong (i.e. the last
# 4<=x<=8 event type for that instance).
# Please note that events with either missing time, type, instance_index or collection_id are ignored. Total number of
# instance events in the trace and filtered number of events are saved in the output.
# ## Data representation
# Total and filtered totals mentioned before are under "total" and "filtered" attributes in the root of the generated
# JSON object. The "data" atrribute is a list of pairs of final instance termination states and the corresponding list
# of time totals per each transition. Each transition total is represented in the form of "x-y" where x is the last
# event type prior to the transition and "y" is the new event detected. Times are calculated by summing all event times
# "y" subtracting the nearest event of type "x" for each instance. If an event "x" is repeated multiple times
# immediately after an event of the same type, only the first event in chronological order is considered. If however
# after multiple repetitions of the event "x" the trace for that instance terminates, an "x-x" time sum is registered by
# computing the difference between the last and the first event of "x" type. Times are represented in microseconds.
import json
import pandas
from IPython import display
import findspark
@ -15,15 +33,20 @@ from pyspark.sql.functions import col, lag, when, concat_ws, last, first
from pyspark.sql import Window
from pyspark.sql.types import LongType
cluster="b"
if len(sys.argv) != 2 or len(sys.argv[1]) != 1:
print("usage: " + sys.argv[0] + " {cluster}", file=sys.stderr)
sys.exit(1)
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("machine_time_waste") \
.config("spark.local.dir", "/run/tmpfiles.d/spark") \
.config("spark.local.dir", "/tmp/ramdisk/spark") \
.config("spark.driver.memory", "124g") \
.getOrCreate()
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
# df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
df.printSchema()
@ -41,39 +64,57 @@ df2 = df \
.select("time", "type", "id")
df2.show()
print("Total: " + str(df.count()))
print("Filtered: " + str(df2.count()))
# my_window = Window.partitionBy("machine_id", "id").orderBy(df2.time.asc())
total = df.count()
filtered = df2.count()
w2 = Window.partitionBy("id").orderBy(df2.time.asc()).rowsBetween(Window.currentRow, Window.unboundedFollowing)
print("Total: " + str(total))
print("Filtered: " + str(filtered))
r = df2.rdd
def for_each_task(ts):
ts = sorted(ts, key=lambda x: x.time)
last_term = None
prev = None
tr = {}
for i,t in enumerate(ts):
if prev is not None and t.type == prev.type: # remove useless transitions
if (i == len(ts)-1): # if last
tr[str(prev.type) + "-" + str(t.type)] = t.time - prev.time # keep "loops" if last
else:
continue
if t.type >= 4 and t.type <= 8:
last_term = t.type
if prev is not None:
tr[str(prev.type) + "-" + str(t.type)] = t.time - prev.time
prev = t
return {"last_term": last_term, 'tr': tr}
def sum_values(ds):
dsum = {}
for dt in ds:
d = dt["tr"]
for key in d:
if key not in dsum:
dsum[key] = d[key]
else:
dsum[key] += d[key]
return dsum
r2 = r \
.groupBy(lambda x: x.id) \
.mapValues(for_each_task) \
.map(lambda x: x[1]) \
.groupBy(lambda x: x["last_term"]) \
.mapValues(sum_values) \
.collect()
with open(cluster + "_state_changes.json", "w") as out:
json.dump({"filtered": filtered, "total": total, "data": r2}, out)
# .withColumn("prev_time", lag(df2.time).over(my_window)) \
# .withColumn("prev_type", lag(df2.type).over(my_window)) \
df3 = df2 \
.withColumn("t3_time", when((df2.type != 3), None).otherwise(df2.time)) \
.withColumn("t45678_time", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.time)) \
.withColumn("t45678_type", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.type)) \
.withColumn("t01_time", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.time)) \
.withColumn("t01_type", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.type)) \
.withColumn("next_time", when(df2.type == 3, first(col("t45678_time"), True).over(w2)) \
.when((df2.type == 0) | (df2.type == 1), first(col("t3_time"), True).over(w2)) \
.when((df2.type >= 4) | (df2.type <= 8), first(col("t01_time"), True).over(w2)) \
.otherwise(None)) \
.withColumn("next_type", when(df2.type == 3, first(col("t45678_type"), True).over(w2)) \
.when((df2.type == 0) | (df2.type == 1), 3) \
.when((df2.type >= 4) | (df2.type <= 8), first(col("t01_type"), True).over(w2)) \
.otherwise(None)) \
.withColumn("last_term_type", last(col("t45678_type"), True).over(w2)) \
.withColumn("time_delta", col("next_time") - col("time")) \
.select("id", "time", "type", "last_term_type", "time_delta", "t01_time", \
"t01_type", "t3_time", "t45678_time", "t45678_type", "next_time", "next_type")
df4 = df3.where(df3.next_type.isNotNull()).groupby("type", "next_type", "last_term_type").sum("time_delta")
# df3.orderBy(df3.machine_id, df3.time).show(n=100)
# df3.printSchema()
df4.show(n=1000000)
df4.write.csv("/home/claudio/google_2019/thesis_queries/machine_time_waste/" + cluster + "_state_change.csv")
# vim: set ts=2 sw=2 et tw=120:

View File

@ -0,0 +1,81 @@
#!/usr/bin/env python3
# coding: utf-8
# # Temporal impact: machine time waste
# Inefficient version of the machine_time_waste.py script in the same dir. Please ignore, kept for future need
import pandas
from IPython import display
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
from pyspark.sql.functions import col, lag, when, concat_ws, last, first
from pyspark.sql import Window
from pyspark.sql.types import LongType
cluster="b"
spark = pyspark.sql.SparkSession.builder \
.appName("machine_time_waste") \
.config("spark.local.dir", "/run/tmpfiles.d/spark") \
.config("spark.driver.memory", "124g") \
.getOrCreate()
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
df.printSchema()
df.show()
# .filter(df.collection_type == 0) \
df2 = df \
.withColumn("time", col("time").cast(LongType())) \
.withColumn("type", col("type").cast(LongType())) \
.withColumn("type", when(col("type").isNull(), 0).otherwise(col("type"))) \
.withColumn("id", concat_ws("-", "collection_id", "instance_index")) \
.where(col("time").isNotNull()) \
.where(col("type").isNotNull()) \
.where((col("instance_index").isNotNull()) & (col("collection_id").isNotNull())) \
.select("time", "type", "id")
df2.show()
print("Total: " + str(df.count()))
print("Filtered: " + str(df2.count()))
# my_window = Window.partitionBy("machine_id", "id").orderBy(df2.time.asc())
w2 = Window.partitionBy("id").orderBy(df2.time.asc()).rowsBetween(Window.currentRow, Window.unboundedFollowing)
# .withColumn("prev_time", lag(df2.time).over(my_window)) \
# .withColumn("prev_type", lag(df2.type).over(my_window)) \
df3 = df2 \
.withColumn("t3_time", when((df2.type != 3), None).otherwise(df2.time)) \
.withColumn("t45678_time", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.time)) \
.withColumn("t45678_type", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.type)) \
.withColumn("t01_time", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.time)) \
.withColumn("t01_type", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.type)) \
.withColumn("next_time", when(df2.type == 3, first(col("t45678_time"), True).over(w2)) \
.when((df2.type == 0) | (df2.type == 1), first(col("t3_time"), True).over(w2)) \
.when((df2.type >= 4) | (df2.type <= 8), first(col("t01_time"), True).over(w2)) \
.otherwise(None)) \
.withColumn("next_type", when(df2.type == 3, first(col("t45678_type"), True).over(w2)) \
.when((df2.type == 0) | (df2.type == 1), 3) \
.when((df2.type >= 4) | (df2.type <= 8), first(col("t01_type"), True).over(w2)) \
.otherwise(None)) \
.withColumn("last_term_type", last(col("t45678_type"), True).over(w2)) \
.withColumn("time_delta", col("next_time") - col("time")) \
.select("id", "time", "type", "last_term_type", "time_delta", "t01_time", \
"t01_type", "t3_time", "t45678_time", "t45678_type", "next_time", "next_type")
df4 = df3.where(df3.next_type.isNotNull()).groupby("type", "next_type", "last_term_type").sum("time_delta")
# df3.orderBy(df3.machine_id, df3.time).show(n=100)
# df3.printSchema()
df4.show(n=1000000)
df4.write.csv("/home/claudio/google_2019/thesis_queries/machine_time_waste/" + cluster + "_state_change.csv")
# vim: set ts=2 sw=2 et tw=120:

View File

@ -1,103 +0,0 @@
#!/usr/bin/env python3
# coding: utf-8
# # Temporal impact: machine time waste
import json
import pandas
from IPython import display
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
from pyspark.sql.functions import col, lag, when, concat_ws, last, first
from pyspark.sql import Window
from pyspark.sql.types import LongType
if len(sys.argv) != 2 or len(sys.argv[1]) != 1:
print("usage: " + sys.argv[0] + " {cluster}", file=sys.stderr)
sys.exit(1)
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("machine_time_waste") \
.config("spark.local.dir", "/tmp/ramdisk/spark") \
.config("spark.driver.memory", "124g") \
.getOrCreate()
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
# df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
df.printSchema()
df.show()
# .filter(df.collection_type == 0) \
df2 = df \
.withColumn("time", col("time").cast(LongType())) \
.withColumn("type", col("type").cast(LongType())) \
.withColumn("type", when(col("type").isNull(), 0).otherwise(col("type"))) \
.withColumn("id", concat_ws("-", "collection_id", "instance_index")) \
.where(col("time").isNotNull()) \
.where(col("type").isNotNull()) \
.where((col("instance_index").isNotNull()) & (col("collection_id").isNotNull())) \
.select("time", "type", "id")
df2.show()
total = df.count()
filtered = df2.count()
print("Total: " + str(total))
print("Filtered: " + str(filtered))
r = df2.rdd
def for_each_task(ts):
ts = sorted(ts, key=lambda x: x.time)
last_term = None
prev = None
tr = {}
for i,t in enumerate(ts):
if prev is not None and t.type == prev.type: # remove useless transitions
if (i == len(ts)-1): # if last
tr[str(prev.type) + "-" + str(t.type)] = t.time - prev.time # keep "loops" if last
else:
continue
if t.type >= 4 and t.type <= 8:
last_term = t.type
if prev is not None:
tr[str(prev.type) + "-" + str(t.type)] = t.time - prev.time
prev = t
return {"last_term": last_term, 'tr': tr}
def sum_values(ds):
dsum = {}
for dt in ds:
d = dt["tr"]
for key in d:
if key not in dsum:
dsum[key] = d[key]
else:
dsum[key] += d[key]
return dsum
r2 = r \
.groupBy(lambda x: x.id) \
.mapValues(for_each_task) \
.map(lambda x: x[1]) \
.groupBy(lambda x: x["last_term"]) \
.mapValues(sum_values) \
.collect()
with open(cluster + "_state_changes.json", "w") as out:
json.dump({"filtered": filtered, "total": total, "data": r2}, out)
# .withColumn("prev_time", lag(df2.time).over(my_window)) \
# .withColumn("prev_type", lag(df2.type).over(my_window)) \
# vim: set ts=2 sw=2 et tw=120: