119 lines
3.3 KiB
Python
119 lines
3.3 KiB
Python
|
#!/usr/bin/env python3
|
||
|
# coding: utf-8
|
||
|
|
||
|
import os
|
||
|
import json
|
||
|
import pandas as pd
|
||
|
import findspark
|
||
|
findspark.init()
|
||
|
import pyspark
|
||
|
import pyspark.sql
|
||
|
import sys
|
||
|
import gzip
|
||
|
from pyspark import AccumulatorParam
|
||
|
from pyspark.sql.functions import lit
|
||
|
from pyspark.sql import Window
|
||
|
from pyspark.sql.types import *
|
||
|
from decimal import *
|
||
|
|
||
|
if len(sys.argv) is not 5:
|
||
|
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}")
|
||
|
sys.exit()
|
||
|
|
||
|
cluster=sys.argv[1]
|
||
|
|
||
|
spark = pyspark.sql.SparkSession.builder \
|
||
|
.appName("task_slowdown") \
|
||
|
.config("spark.driver.maxResultSize", "128g") \
|
||
|
.config("spark.local.dir", sys.argv[2]) \
|
||
|
.config("spark.driver.memory", sys.argv[3]) \
|
||
|
.getOrCreate()
|
||
|
sc = spark.sparkContext
|
||
|
|
||
|
# READING INSTANCE EVENTS DATA
|
||
|
dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
||
|
#dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json"
|
||
|
df = spark.read.json(dfepath)
|
||
|
|
||
|
#df = df.limit(10000)
|
||
|
|
||
|
def tabid(x):
|
||
|
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
|
||
|
|
||
|
|
||
|
def increment_reserv_bucket(bucket, value):
|
||
|
if value < 0:
|
||
|
idx = 0
|
||
|
else:
|
||
|
idx = 40 if value >= 1 else (int(value * 40) + 1)
|
||
|
bucket[idx] += 1
|
||
|
|
||
|
def for_each_joined(var):
|
||
|
task_id = var[0]
|
||
|
ts = var[1]
|
||
|
|
||
|
term = -1
|
||
|
ts = sorted(ts, key=lambda x: -1 if x["time"] is None else x["time"])
|
||
|
|
||
|
cpu_request = [0] * 42
|
||
|
ram_request = [0] * 42
|
||
|
|
||
|
cut = False
|
||
|
for i in range(0, len(ts)):
|
||
|
if ts[i]["time"] is not None:
|
||
|
cut = True
|
||
|
ts = ts[i:len(ts)]
|
||
|
cpu_request[41] = i
|
||
|
ram_request[41] = i
|
||
|
break
|
||
|
|
||
|
if not cut:
|
||
|
raise Exception('all times are none')
|
||
|
|
||
|
for i, t in enumerate(ts):
|
||
|
increment_reserv_bucket(cpu_request, t["rcpu"])
|
||
|
increment_reserv_bucket(ram_request, t["rram"])
|
||
|
|
||
|
if t["type"] >= 4 and t["type"] <= 8:
|
||
|
term = t["type"]
|
||
|
|
||
|
res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||
|
res[term] = {'rcpu': cpu_request, 'rram': ram_request}
|
||
|
|
||
|
return res
|
||
|
|
||
|
def fold_resobjs(ro1, ro2):
|
||
|
if ro1 is None:
|
||
|
return ro2
|
||
|
elif ro2 is None:
|
||
|
return ro1
|
||
|
else:
|
||
|
for k in ro1.keys():
|
||
|
if ro1[k] is None:
|
||
|
ro1[k] = ro2[k]
|
||
|
elif ro2[k] is not None:
|
||
|
for kk in ro1[k].keys():
|
||
|
if ro1[k][kk] is None:
|
||
|
ro1[k][kk] = ro2[k][kk]
|
||
|
elif ro2[k][kk] is not None:
|
||
|
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
|
||
|
return ro1
|
||
|
|
||
|
result = df.rdd \
|
||
|
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
|
||
|
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
|
||
|
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
|
||
|
.map(lambda x: {"id": tabid(x), "time": int(x.time), "type": int(x.type),
|
||
|
"rcpu": Decimal(x.resource_request.cpus), "rram": Decimal(x.resource_request.memory),
|
||
|
"mid": x.machine_id}) \
|
||
|
.groupBy(lambda x: x["id"]) \
|
||
|
.map(for_each_joined) \
|
||
|
.fold(None, fold_resobjs)
|
||
|
|
||
|
d = os.path.dirname(os.path.realpath(__file__))
|
||
|
|
||
|
with open(d + "/" + cluster + "_figure8ab.json", "w") as f:
|
||
|
json.dump(result, f)
|
||
|
|
||
|
# vim: set ts=4 sw=4 et tw=120:
|