Added partial spatial_resource_waste results

This commit is contained in:
Claudio Maggioni 2021-03-10 16:03:54 +00:00
parent 5373284e57
commit 344afc5391
11 changed files with 114 additions and 2 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
**/.ipynb_checkpoints/
task_slowdown/?_state_changes.json.gz

BIN
spatial_resource_waste/a_res_micros_requested.json (Stored with Git LFS) Normal file

Binary file not shown.

BIN
spatial_resource_waste/b_res_micros_requested.json (Stored with Git LFS) Normal file

Binary file not shown.

BIN
spatial_resource_waste/c_res_micros_requested.json (Stored with Git LFS) Normal file

Binary file not shown.

BIN
spatial_resource_waste/d_res_micros_requested.json (Stored with Git LFS) Normal file

Binary file not shown.

BIN
spatial_resource_waste/e_res_micros_requested.json (Stored with Git LFS) Normal file

Binary file not shown.

BIN
spatial_resource_waste/f_res_micros_requested.json (Stored with Git LFS) Normal file

Binary file not shown.

BIN
spatial_resource_waste/g_res_micros_requested.json (Stored with Git LFS) Normal file

Binary file not shown.

View file

@ -0,0 +1,91 @@
#!/usr/bin/env python3
# coding: utf-8
import json
import pandas
from IPython import display
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import ByteType
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "32g") \
.config("spark.local.dir", "/run/tmpfiles.d/spark") \
.config("spark.driver.memory", "75g") \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
#df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
RUN = set([(3,1), (3,4), (3,5), (3,6), (3,7), (3,8), (3,10), (10,1), (10,4), (10,5), (10,6), (10,7), (10,8), (10,10)])
def is_res_none(tres):
return tres is None or tres["cpus"] is None or tres["memory"] is None
def for_each_task(ts):
ts = sorted(ts, key=lambda x: x["time"])
last_term = None
last_resources = None
prev = None
cpu = 0
ram = 0
for i,t in enumerate(ts):
if t["type"] >= 4 and t["type"] <= 8:
last_term = t["type"]
if prev is not None:
if (prev["type"], t["type"]) in RUN:
if is_res_none(last_resources):
last_resources = t["res"]
if not is_res_none(last_resources):
delta = t["time"] - prev["time"]
cpu += round(delta * last_resources["cpus"])
ram += round(delta * last_resources["memory"])
prev = t
if not is_res_none(last_resources):
last_resources = t["res"]
return [("cpu-" + str(last_term), cpu), ("ram-" + str(last_term), ram)]
def cleanup(x):
return {
"time": int(x.time),
"type": 0 if x.type is None else int(x.type),
"id": x.collection_id + "-" + x.instance_index,
"res": x.resource_request
}
df2 = df.rdd \
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
.filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \
.map(cleanup) \
.groupBy(lambda x: x["id"]) \
.mapValues(for_each_task) \
.flatMap(lambda x: x[1]) \
.groupBy(lambda x: x[0]) \
.mapValues(lambda xs: sum(n for _, n in xs)) \
.collect()
result = {}
for pair in df2:
result[pair[0]] = pair[1]
with open(cluster + "_res_micros_requested.json", "w") as out:
json.dump(result, out, separators=(',', ':'))

View file

@ -1 +0,0 @@
task_slowdown/?_state_changes.json.gz

View file

@ -38,7 +38,7 @@ Google drive.
- (%) total wasted time per unsuccessful event type
- (mins.) avg. wasted time per number of events for each event type
- breakdown of wasted time per *submission*, *scheduling*, *queue*
- *III-A-I: Average slowdown per task*: (Table II)
- [&#x2705; **task_slowdown**] *III-A-I: Average slowdown per task*: (Table II)
For FINISH type tasks, compute *slowdown*, i.e. mean (**ask Rosa**) of all
*response time* for each task event over *response time* of last event (which
is by def. FINISH). Response time is defined as *Queue time* + *Exec time*