:Added task slowdown results

This commit is contained in:
Claudio Maggioni 2021-02-27 12:07:15 +00:00
parent 5b3e7ae3eb
commit c34009963d
14 changed files with 460 additions and 139 deletions

View file

@ -0,0 +1,125 @@
#!/usr/bin/env python3
# coding: utf-8
# # Temporal impact: machine time waste
# This analysis is meant to analyse the time spend by instance events doing submission, queueing, and execution. This
# preliminary script sums the total time spent by instance executions to transition from each event type to another.
# Additionaly, time sums are partitioned by the last termination state of the instance they belong (i.e. the last
# 4<=x<=8 event type for that instance).
# Please note that events with either missing time, type, instance_index or collection_id are ignored. Total number of
# instance events in the trace and filtered number of events are saved in the output.
# ## Data representation
# Total and filtered totals mentioned before are under "total" and "filtered" attributes in the root of the generated
# JSON object. The "data" atrribute is a list of pairs of final instance termination states and the corresponding list
# of time totals per each transition. Each transition total is represented in the form of "x-y" where x is the last
# event type prior to the transition and "y" is the new event detected. Times are calculated by summing all event times
# "y" subtracting the nearest event of type "x" for each instance. If an event "x" is repeated multiple times
# immediately after an event of the same type, only the first event in chronological order is considered. If however
# after multiple repetitions of the event "x" the trace for that instance terminates, an "x-x" time sum is registered by
# computing the difference between the last and the first event of "x" type. Times are represented in microseconds.
import json
import pandas
from IPython import display
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
from pyspark.sql.functions import lit
from pyspark.sql.types import ByteType
if len(sys.argv) != 2 or len(sys.argv[1]) != 1:
print("usage: " + sys.argv[0] + " {cluster}", file=sys.stderr)
sys.exit(1)
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("machine_time_waste") \
.config("spark.local.dir", "/run/tmpfiles.d/spark") \
.config("spark.driver.memory", "124g") \
.getOrCreate()
sc = spark.sparkContext
#df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json") \
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
df.printSchema()
df.show()
total = sc.accumulator(0)
filtered = sc.accumulator(1)
def for_each_task(ts):
ts = sorted(ts, key=lambda x: x["time"])
last_term = None
prev = None
tr = {}
for i,t in enumerate(ts):
if prev is not None and t["type"] == prev["type"]: # remove useless transitions
if i == len(ts) - 1: # if last
tr[str(prev["type"]) + "-" + str(t["type"])] = t["time"] - prev["time"] # keep "loops" if last
else:
continue
if t["type"] >= 4 and t["type"] <= 8:
last_term = t["type"]
if prev is not None:
tr[str(prev["type"]) + "-" + str(t["type"])] = t["time"] - prev["time"]
prev = t
return {"last_term": last_term, 'tr': tr}
def sum_values(ds):
dsum = {}
for dt in ds:
d = dt["tr"]
for key in d:
if key not in dsum:
dsum[key] = d[key]
else:
dsum[key] += d[key]
return dsum
def count_total(x):
total.add(1)
return x
def cleanup(x):
filtered.add(1)
return {
"time": int(x.time),
"type": 0 if x.type is None else int(x.type),
"id": x.collection_id + "-" + x.instance_index,
}
r2 = df.rdd \
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
.map(count_total) \
.filter(lambda x: x.type is not None and x.time is not None
and x.instance_index is not None and x.collection_id is not None) \
.map(cleanup) \
.groupBy(lambda x: x["id"]) \
.mapValues(for_each_task) \
.map(lambda x: x[1]) \
.groupBy(lambda x: x["last_term"]) \
.mapValues(sum_values) \
.collect()
with open(cluster + "_state_changes_jobs.json", "w") as out:
json.dump({"filtered": filtered.value, "total": total.value, "data": r2}, out)
# vim: set ts=2 sw=2 et tw=120:

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

File diff suppressed because one or more lines are too long

99
task_slowdown/task_slowdown.py Executable file
View file

@ -0,0 +1,99 @@
#!/usr/bin/env python3
# coding: utf-8
import json
import pandas
from IPython import display
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
from pyspark.sql.functions import col, lag, when, concat_ws, last, first
from pyspark.sql import Window
from pyspark.sql.types import LongType
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.local.dir", "/run/tmpfiles.d/spark") \
.config("spark.driver.memory", "124g") \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
# df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
df.printSchema()
df.show()
non = sc.accumulator(0)
tot = sc.accumulator(0)
def for_each_task(ts):
global none
global tot
ts = sorted(ts, key=lambda x: x["time"])
last_term = None
priority = None
responding = False
resp_burst_start = None
resp_burst_type = None
resp_time = []
resp_time_last = 0
for i,t in enumerate(ts):
if t["priority"] is not None and priority is None:
priority = t["priority"]
if responding:
resp_burst_type.append(t["type"])
if t["type"] >= 4 and t["type"] <= 8:
last_term = t["type"]
if responding:
# This response time interval has ended, so record the time delta and term
resp_time.append((t["time"] - resp_burst_start, resp_burst_type))
responding = False
if (not responding) and (t["type"] < 4 or t["type"] > 8):
resp_burst_start = t["time"]
resp_burst_type = [t["type"]]
responding = True
tot.add(1)
if last_term != 6:
non.add(1)
return (priority, resp_time) if last_term == 5 else None
def cleanup(x):
return {
"time": int(x.time),
"type": 0 if x.type is None else int(x.type),
"id": x.collection_id + "-" + x.instance_index,
"priority": 0 if x.priority is None else int(x.priority)
}
df2 = df.rdd \
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
.filter(lambda x: x.type is not None and x.time is not None
and x.instance_index is not None and x.collection_id is not None) \
.map(cleanup) \
.groupBy(lambda x: x["id"]) \
.mapValues(for_each_task) \
.filter(lambda x: x[1] is not None) \
.map(lambda x: x[1]) \
.groupBy(lambda x: x[0]) \
.mapValues(lambda x: [e[1] for e in x])
a = {"val": df2.collect(), "tot": tot.value, "non": non.value}
with open(cluster + "_state_changes.json", "w") as out:
json.dump(a, out)

View file

@ -27,7 +27,7 @@ Google drive.
## Analysis from Rosa/Chen Paper
- [&#x2705; **machine_configs**] Table of distinct CPU/Memory configurations of machines and their distrib. (%)
(Table I)
- *III-A: Temporal impact: machine time waste*:
- [&#x2705; **machine_time_waste**] *III-A: Temporal impact: machine time waste*:
Stacked histogram
- Y-axis: normalized (%) aggregated machine time
- X-axis: event type