bachelorThesis/task_slowdown/task_slowdown.py

119 lines
3.3 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf-8
import json
import pandas
from IPython import display
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import ByteType
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "32g") \
.config("spark.local.dir", "/run/tmpfiles.d/spark") \
.config("spark.driver.memory", "75g") \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
#df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
class NonPriorityAcc(pyspark.AccumulatorParam):
def zero(self, value):
return {}
def addInPlace(self, v1, v2):
for key in v2:
if key in v1:
v1[key] += v2[key]
else:
v1[key] = v2[key]
return v1
non = sc.accumulator({}, NonPriorityAcc())
def for_each_task(ts):
global non
ts = sorted(ts, key=lambda x: x["time"])
last_term = None
priority = -1
responding = False
resp_burst_start = None
resp_burst_type = None
resp_time = []
resp_time_last = 0
for i,t in enumerate(ts):
if t["priority"] is not -1 and priority is -1:
priority = t["priority"]
#if responding:
# resp_burst_type.append(t["type"])
if t["type"] >= 4 and t["type"] <= 8:
last_term = t["type"]
if responding:
resp_burst_type.append(t["type"])
# This response time interval has ended, so record the time delta and term
resp_time.append((t["time"] - resp_burst_start, resp_burst_type))
responding = False
if (not responding) and (t["type"] < 4 or t["type"] > 8):
resp_burst_start = t["time"]
resp_burst_type = [t["type"]]
responding = True
if last_term != 6:
non.add({priority: 1})
return (priority, resp_time) if last_term == 6 else None
def cleanup(x):
return {
"time": int(x.time),
"type": 0 if x.type is None else int(x.type),
"id": x.collection_id + "-" + x.instance_index,
"priority": -1 if x.priority is None else int(x.priority)
}
df2 = df.rdd \
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
.filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \
.map(cleanup) \
.groupBy(lambda x: x["id"]) \
.mapValues(for_each_task) \
.filter(lambda x: x[1] is not None) \
.map(lambda x: x[1]) \
.groupBy(lambda x: x[0]) \
.mapValues(lambda x: [e[1] for e in x])
val = df2.collect()
val2 = {}
for e in val:
val2[e[0]] = e[1]
a = {"val": val2, "non": non.value}
with gzip.open(cluster + "_state_changes.json.gz", "wt") as out:
json.dump(a, out, separators=(',', ':'))