bachelorThesis/spatial_resource_waste/spatial_resource_waste.py

91 lines
2.8 KiB
Python

#!/usr/bin/env python3
# coding: utf-8
import json
import pandas
from IPython import display
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import ByteType
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "32g") \
.config("spark.local.dir", "/run/tmpfiles.d/spark") \
.config("spark.driver.memory", "75g") \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
#df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
RUN = set([(3,1), (3,4), (3,5), (3,6), (3,7), (3,8), (3,10), (10,1), (10,4), (10,5), (10,6), (10,7), (10,8), (10,10)])
def is_res_none(tres):
return tres is None or tres["cpus"] is None or tres["memory"] is None
def for_each_task(ts):
ts = sorted(ts, key=lambda x: x["time"])
last_term = None
last_resources = None
prev = None
cpu = 0
ram = 0
for i,t in enumerate(ts):
if t["type"] >= 4 and t["type"] <= 8:
last_term = t["type"]
if prev is not None:
if (prev["type"], t["type"]) in RUN:
if is_res_none(last_resources):
last_resources = t["res"]
if not is_res_none(last_resources):
delta = t["time"] - prev["time"]
cpu += round(delta * last_resources["cpus"])
ram += round(delta * last_resources["memory"])
prev = t
if not is_res_none(last_resources):
last_resources = t["res"]
return [("cpu-" + str(last_term), cpu), ("ram-" + str(last_term), ram)]
def cleanup(x):
return {
"time": int(x.time),
"type": 0 if x.type is None else int(x.type),
"id": x.collection_id + "-" + x.instance_index,
"res": x.resource_request
}
df2 = df.rdd \
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
.filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \
.map(cleanup) \
.groupBy(lambda x: x["id"]) \
.mapValues(for_each_task) \
.flatMap(lambda x: x[1]) \
.groupBy(lambda x: x[0]) \
.mapValues(lambda xs: sum(n for _, n in xs)) \
.collect()
result = {}
for pair in df2:
result[pair[0]] = pair[1]
with open(cluster + "_res_micros_requested.json", "w") as out:
json.dump(result, out, separators=(',', ':'))