Merge branch 'master' of tea.maggioni.xyz:maggicl/bachelorThesis

This commit is contained in:
Claudio Maggioni 2021-04-16 10:29:39 +00:00
commit 2456d3afa9
7 changed files with 587 additions and 1 deletions

6
Untitled.ipynb Normal file
View file

@ -0,0 +1,6 @@
{
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 5
}

151
figure_8/figure8-abcd-only.py Executable file
View file

@ -0,0 +1,151 @@
#!/usr/bin/env python3
# coding: utf-8
import os
import json
import pandas as pd
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
if len(sys.argv) is not 5:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {joindir}")
sys.exit()
joindir=sys.argv[4]
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", sys.argv[3]) \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.parquet(joindir + "/figure-8-join-" + cluster + ".parquet")
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
print("Starting to read machine events...")
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
'time': lambda x: -1 if x == '' else int(x),
'machine_id': lambda x: str(x),
'capacity.cpus': lambda x: -1 if x == '' else Decimal(x),
'capacity.memory': lambda x: -1 if x == '' else Decimal(x)})
print("Dropping remove events...")
dfm = dfm[(dfm.type!=2)&(dfm.time!=-1)&(dfm["capacity.cpus"]!=-1)&(dfm["capacity.memory"]!=-1)]
print("Dropping missing data events...")
dfm = dfm[dfm.missing_data_reason.isnull()]
print("Projecting on useful columns...")
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
print("Sorting by time...")
dfm = dfm.sort_values(by=["machine_id", "time"])
print("Converting to broadcast variable...")
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
print("Done with machine events.")
def get_machine_time_resources(machine_id, time):
def aux(i, j):
if i == j:
return dfm.value[i] if dfm.value[i][1] == machine_id else None
elif i + 1 == j:
if dfm.value[i][1] == machine_id:
return dfm.value[i]
elif dfm.value[j][1] == machine_id:
return dfm.value[j]
else:
return None
mid = (i + j) // 2
if dfm.value[mid][1] > machine_id:
return aux(i, mid - 1)
elif dfm.value[mid][1] < machine_id:
return aux(mid + 1, j)
elif dfm.value[mid][0] > time:
return aux(i, mid)
elif dfv.value[mid][0] < time:
return aux(mid, j)
else:
return dfm.value[mid]
return aux(0, len(dfm.value)-1)
def increment_reserv_bucket(bucket, value):
if value < 0:
idx = 0
else:
idx = 40 if value >= 1 else (int(value * 40) + 1)
bucket[idx] += 1
def for_each_joined(x):
task_id = x[0]
ts = x[1]
term = -1
ts = sorted(ts, key=lambda x: x.time)
cpu_util = [0] * 41
ram_util = [0] * 41
cpu_request = [0] * 41
ram_request = [0] * 41
for i, t in enumerate(ts):
machine_log = get_machine_time_resources(mid, t.time)
if machine_log is not None:
util_cpu = t.acpu / machine_log[2]
util_ram = t.aram / machine_log[3]
else:
util_cpu = -1
util_ram = -1
# 8a-b
increment_reserv_bucket(cpu_request, t.rcpu)
increment_reserv_bucket(ram_request, t.rram)
# 8e-f
increment_reserv_bucket(cpu_util, t.acpu)
increment_reserv_bucket(ram_util, t.aram)
if t.type >= 4 and t.type <= 8:
term = t.type
res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
res[term] = {'rcpu': cpu_request, 'rram': ram_request, 'ucpu': cpu_util, 'uram': ram_util}
return res
def fold_resobjs(ro1, ro2):
if ro1 is None:
return ro2
elif ro2 is None:
return ro1
else:
for k in ro1.keys():
for kk in ro1[k].keys():
if ro1[k][kk] is None:
ro1[k][kk] = ro2[k][kk]
elif ro2[k][kk] is None:
continue
else:
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
return ro1
result = df.rdd \
.groupBy(lambda x: x.id) \
.map(for_each_joined) \
.fold(None, fold_resobjs)
d = os.path.dirname(os.path.realpath(__file__))
with open(d + "/" + cluster + "_figure8abcd.json", "w") as f:
json.dump(result, f)
# vim: set ts=4 sw=4 et tw=120:

View file

@ -709,7 +709,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
"version": "3.9.2"
}
},
"nbformat": 4,

File diff suppressed because one or more lines are too long

Binary file not shown.

BIN
status.ods Normal file

Binary file not shown.

1
task_slowdown/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
task_slowdown/?_state_changes.json.gz