More query results
This commit is contained in:
parent
4d1d876b6b
commit
885163bc84
12 changed files with 608 additions and 0 deletions
119
figure_7/figure7c.py
Executable file
119
figure_7/figure7c.py
Executable file
|
@ -0,0 +1,119 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding: utf-8
|
||||
|
||||
import os
|
||||
import json
|
||||
import pandas as pd
|
||||
import findspark
|
||||
findspark.init()
|
||||
import pyspark
|
||||
import pyspark.sql
|
||||
import sys
|
||||
import gzip
|
||||
from pyspark import AccumulatorParam
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql import Window
|
||||
from pyspark.sql.types import *
|
||||
from decimal import *
|
||||
import random
|
||||
|
||||
if len(sys.argv) is not 4:
|
||||
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
||||
sys.exit()
|
||||
|
||||
cluster=sys.argv[1]
|
||||
|
||||
spark = pyspark.sql.SparkSession.builder \
|
||||
.appName("task_slowdown") \
|
||||
.config("spark.driver.maxResultSize", "128g") \
|
||||
.config("spark.local.dir", sys.argv[2]) \
|
||||
.config("spark.driver.memory", sys.argv[3]) \
|
||||
.getOrCreate()
|
||||
sc = spark.sparkContext
|
||||
|
||||
# READING INSTANCE EVENTS DATA
|
||||
dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
||||
#dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json"
|
||||
df = spark.read.json(dfepath)
|
||||
|
||||
def tabid(x):
|
||||
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
|
||||
|
||||
def bucket_sum_per_termination(bucket, last_term_by_id):
|
||||
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||
for tid, vs in bucket.items():
|
||||
term = last_term_by_id[tid]
|
||||
if result[term] is None:
|
||||
result[term] = vs
|
||||
else:
|
||||
result[term] = [sum(x) for x in zip(result[term], vs)]
|
||||
return result
|
||||
|
||||
def tally_event(bucket, term, nexec):
|
||||
if term not in bucket:
|
||||
bucket[term] = {}
|
||||
if nexec not in bucket[term]:
|
||||
bucket[term][nexec] = 0
|
||||
bucket[term][nexec] += 1
|
||||
|
||||
def for_each_joined(x):
|
||||
machine_id = x[0]
|
||||
ts = x[1]
|
||||
|
||||
ts = sorted(ts, key=lambda x: x["time"])
|
||||
in_execution = set()
|
||||
chum = {}
|
||||
|
||||
for t in ts:
|
||||
in_execution.add(t["id"])
|
||||
tally_event(chum, t["term"], len(in_execution))
|
||||
if t["end"]:
|
||||
in_execution.remove(t["id"])
|
||||
|
||||
return chum
|
||||
|
||||
def fold_resobjs(ro1, ro2):
|
||||
if not ro1.keys():
|
||||
return ro2
|
||||
elif ro2.keys():
|
||||
for k in set(ro1.keys()).union(set(ro2.keys())):
|
||||
if k not in ro1:
|
||||
ro1[k] = ro2[k]
|
||||
elif k in ro2:
|
||||
for kk in set(ro1[k].keys()).union(set(ro2[k].keys())):
|
||||
if kk not in ro1[k]:
|
||||
ro1[k][kk] = ro2[k][kk]
|
||||
elif kk in ro2[k]:
|
||||
ro1[k][kk] += ro2[k][kk]
|
||||
return ro1
|
||||
|
||||
def mark_next(data):
|
||||
ts = data[1]
|
||||
ts = sorted(ts, key=lambda z: z[1])
|
||||
last_term = -1
|
||||
for i in range(0, len(ts)):
|
||||
t = ts[i]
|
||||
ts[i] = {"id": t[0], "time": t[1], "type": t[2], "mid": t[3], "end": (i == len(ts) -1 or t[3] != ts[i+1][3])}
|
||||
if ts[i]["type"] >= 4 or ts[i]["type"] <= 8:
|
||||
last_term = ts[i]["type"]
|
||||
for t in ts:
|
||||
t["term"] = last_term
|
||||
return ts
|
||||
|
||||
result = df.rdd \
|
||||
.filter(lambda x: x.time is not None and x.type is not None and
|
||||
x.instance_index is not None and x.collection_id is not None) \
|
||||
.map(lambda x: [tabid(x), int(x.time), int(x.type), x.machine_id]) \
|
||||
.groupBy(lambda x: x[0]) \
|
||||
.flatMap(mark_next) \
|
||||
.groupBy(lambda x: x["mid"]) \
|
||||
.partitionBy(10000, lambda x: random.randint(0, 10000-1)) \
|
||||
.map(for_each_joined) \
|
||||
.fold({}, fold_resobjs)
|
||||
|
||||
d = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
with open(d + "/" + cluster + "_figure7c.json", "w") as f:
|
||||
json.dump(result, f)
|
||||
|
||||
# vim: set ts=4 sw=4 et tw=120:
|
BIN
figure_8/a_figure8.json
(Stored with Git LFS)
Normal file
BIN
figure_8/a_figure8.json
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
figure_8/a_figure8abef.json
(Stored with Git LFS)
Normal file
BIN
figure_8/a_figure8abef.json
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
figure_8/b_figure8abef.json
(Stored with Git LFS)
Normal file
BIN
figure_8/b_figure8abef.json
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
figure_8/c_figure8abef.json
(Stored with Git LFS)
Normal file
BIN
figure_8/c_figure8abef.json
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
figure_8/d_figure8abef.json
(Stored with Git LFS)
Normal file
BIN
figure_8/d_figure8abef.json
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
figure_8/e_figure8ab.json
(Stored with Git LFS)
Normal file
BIN
figure_8/e_figure8ab.json
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
figure_8/f_figure8ab.json
(Stored with Git LFS)
Normal file
BIN
figure_8/f_figure8ab.json
(Stored with Git LFS)
Normal file
Binary file not shown.
118
figure_8/figure8-ab-only.py
Executable file
118
figure_8/figure8-ab-only.py
Executable file
|
@ -0,0 +1,118 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding: utf-8
|
||||
|
||||
import os
|
||||
import json
|
||||
import pandas as pd
|
||||
import findspark
|
||||
findspark.init()
|
||||
import pyspark
|
||||
import pyspark.sql
|
||||
import sys
|
||||
import gzip
|
||||
from pyspark import AccumulatorParam
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql import Window
|
||||
from pyspark.sql.types import *
|
||||
from decimal import *
|
||||
|
||||
if len(sys.argv) is not 5:
|
||||
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}")
|
||||
sys.exit()
|
||||
|
||||
cluster=sys.argv[1]
|
||||
|
||||
spark = pyspark.sql.SparkSession.builder \
|
||||
.appName("task_slowdown") \
|
||||
.config("spark.driver.maxResultSize", "128g") \
|
||||
.config("spark.local.dir", sys.argv[2]) \
|
||||
.config("spark.driver.memory", sys.argv[3]) \
|
||||
.getOrCreate()
|
||||
sc = spark.sparkContext
|
||||
|
||||
# READING INSTANCE EVENTS DATA
|
||||
dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
||||
#dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json"
|
||||
df = spark.read.json(dfepath)
|
||||
|
||||
#df = df.limit(10000)
|
||||
|
||||
def tabid(x):
|
||||
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
|
||||
|
||||
|
||||
def increment_reserv_bucket(bucket, value):
|
||||
if value < 0:
|
||||
idx = 0
|
||||
else:
|
||||
idx = 40 if value >= 1 else (int(value * 40) + 1)
|
||||
bucket[idx] += 1
|
||||
|
||||
def for_each_joined(var):
|
||||
task_id = var[0]
|
||||
ts = var[1]
|
||||
|
||||
term = -1
|
||||
ts = sorted(ts, key=lambda x: -1 if x["time"] is None else x["time"])
|
||||
|
||||
cpu_request = [0] * 42
|
||||
ram_request = [0] * 42
|
||||
|
||||
cut = False
|
||||
for i in range(0, len(ts)):
|
||||
if ts[i]["time"] is not None:
|
||||
cut = True
|
||||
ts = ts[i:len(ts)]
|
||||
cpu_request[41] = i
|
||||
ram_request[41] = i
|
||||
break
|
||||
|
||||
if not cut:
|
||||
raise Exception('all times are none')
|
||||
|
||||
for i, t in enumerate(ts):
|
||||
increment_reserv_bucket(cpu_request, t["rcpu"])
|
||||
increment_reserv_bucket(ram_request, t["rram"])
|
||||
|
||||
if t["type"] >= 4 and t["type"] <= 8:
|
||||
term = t["type"]
|
||||
|
||||
res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||
res[term] = {'rcpu': cpu_request, 'rram': ram_request}
|
||||
|
||||
return res
|
||||
|
||||
def fold_resobjs(ro1, ro2):
|
||||
if ro1 is None:
|
||||
return ro2
|
||||
elif ro2 is None:
|
||||
return ro1
|
||||
else:
|
||||
for k in ro1.keys():
|
||||
if ro1[k] is None:
|
||||
ro1[k] = ro2[k]
|
||||
elif ro2[k] is not None:
|
||||
for kk in ro1[k].keys():
|
||||
if ro1[k][kk] is None:
|
||||
ro1[k][kk] = ro2[k][kk]
|
||||
elif ro2[k][kk] is not None:
|
||||
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
|
||||
return ro1
|
||||
|
||||
result = df.rdd \
|
||||
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
|
||||
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
|
||||
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
|
||||
.map(lambda x: {"id": tabid(x), "time": int(x.time), "type": int(x.type),
|
||||
"rcpu": Decimal(x.resource_request.cpus), "rram": Decimal(x.resource_request.memory),
|
||||
"mid": x.machine_id}) \
|
||||
.groupBy(lambda x: x["id"]) \
|
||||
.map(for_each_joined) \
|
||||
.fold(None, fold_resobjs)
|
||||
|
||||
d = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
with open(d + "/" + cluster + "_figure8ab.json", "w") as f:
|
||||
json.dump(result, f)
|
||||
|
||||
# vim: set ts=4 sw=4 et tw=120:
|
157
figure_8/figure8-abef-only.py
Executable file
157
figure_8/figure8-abef-only.py
Executable file
|
@ -0,0 +1,157 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding: utf-8
|
||||
|
||||
import os
|
||||
import json
|
||||
import pandas as pd
|
||||
import findspark
|
||||
findspark.init()
|
||||
import pyspark
|
||||
import pyspark.sql
|
||||
import sys
|
||||
import gzip
|
||||
from pyspark import AccumulatorParam
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql import Window
|
||||
from pyspark.sql.types import *
|
||||
from decimal import *
|
||||
|
||||
if len(sys.argv) is not 5:
|
||||
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {joindir}")
|
||||
sys.exit()
|
||||
|
||||
joindir=sys.argv[4]
|
||||
|
||||
cluster=sys.argv[1]
|
||||
|
||||
spark = pyspark.sql.SparkSession.builder \
|
||||
.appName("task_slowdown") \
|
||||
.config("spark.driver.maxResultSize", sys.argv[3]) \
|
||||
.config("spark.local.dir", sys.argv[2]) \
|
||||
.config("spark.driver.memory", sys.argv[3]) \
|
||||
.getOrCreate()
|
||||
sc = spark.sparkContext
|
||||
|
||||
df = spark.read.parquet(joindir + "/figure-8-join-" + cluster + ".parquet")
|
||||
|
||||
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
||||
print("Starting to read machine events...")
|
||||
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
|
||||
'time': lambda x: -1 if x == '' else int(x),
|
||||
'machine_id': lambda x: str(x),
|
||||
'capacity.cpus': lambda x: -1 if x == '' else Decimal(x),
|
||||
'capacity.memory': lambda x: -1 if x == '' else Decimal(x)})
|
||||
print("Dropping remove events...")
|
||||
dfm = dfm[(dfm.type!=2)&(dfm.time!=-1)&(dfm["capacity.cpus"]!=-1)&(dfm["capacity.memory"]!=-1)]
|
||||
print("Dropping missing data events...")
|
||||
dfm = dfm[dfm.missing_data_reason.isnull()]
|
||||
print("Projecting on useful columns...")
|
||||
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
|
||||
print("Sorting by time...")
|
||||
dfm = dfm.sort_values(by=["machine_id", "time"])
|
||||
print("Converting to broadcast variable...")
|
||||
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
|
||||
print("Done with machine events.")
|
||||
|
||||
def get_machine_time_resources(machine_id, time):
|
||||
def aux(i, j):
|
||||
if i == j:
|
||||
return dfm.value[i] if dfm.value[i][1] == machine_id else None
|
||||
elif i + 1 == j:
|
||||
if dfm.value[i][1] == machine_id:
|
||||
return dfm.value[i]
|
||||
elif dfm.value[j][1] == machine_id:
|
||||
return dfm.value[j]
|
||||
else:
|
||||
return None
|
||||
|
||||
mid = (i + j) // 2
|
||||
|
||||
if dfm.value[mid][1] > machine_id:
|
||||
return aux(i, mid - 1)
|
||||
elif dfm.value[mid][1] < machine_id:
|
||||
return aux(mid + 1, j)
|
||||
elif dfm.value[mid][0] > time:
|
||||
return aux(i, mid)
|
||||
elif dfm.value[mid][0] < time:
|
||||
return aux(mid, j)
|
||||
else:
|
||||
return dfm.value[mid]
|
||||
|
||||
return aux(0, len(dfm.value)-1)
|
||||
|
||||
def increment_reserv_bucket(bucket, value):
|
||||
if value < 0:
|
||||
idx = 0
|
||||
else:
|
||||
idx = 40 if value >= 1 else (int(value * 40) + 1)
|
||||
bucket[idx] += 1
|
||||
|
||||
def for_each_joined(x):
|
||||
task_id = x[0]
|
||||
ts = x[1]
|
||||
|
||||
term = -1
|
||||
ts = sorted(ts, key=lambda x: x.time)
|
||||
|
||||
cpu_util = [0] * 41
|
||||
cpu_util_wr = [0] * 41
|
||||
ram_util = [0] * 41
|
||||
ram_util_wr = [0] * 41
|
||||
cpu_request = [0] * 41
|
||||
ram_request = [0] * 41
|
||||
|
||||
for i, t in enumerate(ts):
|
||||
machine_log = get_machine_time_resources(t.mid, t.time)
|
||||
if machine_log is not None:
|
||||
util_cpu = t.acpu / machine_log[2]
|
||||
util_ram = t.aram / machine_log[3]
|
||||
else:
|
||||
util_cpu = -1
|
||||
util_ram = -1
|
||||
# 8a-b
|
||||
increment_reserv_bucket(cpu_request, t.rcpu)
|
||||
increment_reserv_bucket(ram_request, t.rram)
|
||||
# 8e-f (wrong old version)
|
||||
increment_reserv_bucket(cpu_util_wr, t.acpu)
|
||||
increment_reserv_bucket(ram_util_wr, t.aram)
|
||||
# 8e-f
|
||||
increment_reserv_bucket(cpu_util, util_cpu)
|
||||
increment_reserv_bucket(ram_util, util_ram)
|
||||
|
||||
if t.type >= 4 and t.type <= 8:
|
||||
term = t.type
|
||||
|
||||
res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||
res[term] = {'rcpu': cpu_request, 'rram': ram_request, 'wucpu': cpu_util_wr, 'wuram': ram_util_wr, 'ucpu': cpu_util, 'uram': ram_util}
|
||||
|
||||
return res
|
||||
|
||||
def fold_resobjs(ro1, ro2):
|
||||
if ro1 is None:
|
||||
return ro2
|
||||
elif ro2 is None:
|
||||
return ro1
|
||||
else:
|
||||
for k in ro1.keys():
|
||||
if ro1[k] is None:
|
||||
ro1[k] = ro2[k]
|
||||
elif ro2[k] is not None:
|
||||
for kk in ro1[k].keys():
|
||||
if ro1[k][kk] is None:
|
||||
ro1[k][kk] = ro2[k][kk]
|
||||
elif ro2[k][kk] is not None:
|
||||
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
|
||||
return ro1
|
||||
|
||||
result = df.rdd \
|
||||
.groupBy(lambda x: x.id) \
|
||||
.map(for_each_joined) \
|
||||
.fold(None, fold_resobjs)
|
||||
|
||||
d = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
with open(d + "/" + cluster + "_figure8abef.json", "w") as f:
|
||||
json.dump(result, f)
|
||||
|
||||
# vim: set ts=4 sw=4 et tw=120:
|
190
figure_8/figure8-cd-only.py
Executable file
190
figure_8/figure8-cd-only.py
Executable file
|
@ -0,0 +1,190 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding: utf-8
|
||||
|
||||
import os
|
||||
import json
|
||||
import pandas as pd
|
||||
import findspark
|
||||
findspark.init()
|
||||
import pyspark
|
||||
import pyspark.sql
|
||||
import sys
|
||||
import gzip
|
||||
from pyspark import AccumulatorParam
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql import Window
|
||||
from pyspark.sql.types import *
|
||||
from decimal import *
|
||||
|
||||
if len(sys.argv) is not 4:
|
||||
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
||||
sys.exit()
|
||||
|
||||
cluster=sys.argv[1]
|
||||
|
||||
spark = pyspark.sql.SparkSession.builder \
|
||||
.appName("task_slowdown") \
|
||||
.config("spark.driver.maxResultSize", "128g") \
|
||||
.config("spark.local.dir", sys.argv[2]) \
|
||||
.config("spark.driver.memory", sys.argv[3]) \
|
||||
.getOrCreate()
|
||||
sc = spark.sparkContext
|
||||
|
||||
#
|
||||
# READING INSTANCE EVENTS DATA
|
||||
#
|
||||
dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
||||
df = spark.read.json(dfepath)
|
||||
df = df.limit(10000)
|
||||
|
||||
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
||||
print("Starting to read machine events...")
|
||||
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
|
||||
'time': lambda x: -1 if x == '' else int(x),
|
||||
'machine_id': lambda x: str(x),
|
||||
'capacity.cpus': lambda x: -1 if x == '' else Decimal(x),
|
||||
'capacity.memory': lambda x: -1 if x == '' else Decimal(x)})
|
||||
print("Dropping remove events...")
|
||||
dfm = dfm[(dfm.type!=2)&(dfm.time!=-1)&(dfm["capacity.cpus"]!=-1)&(dfm["capacity.memory"]!=-1)]
|
||||
print("Dropping missing data events...")
|
||||
dfm = dfm[dfm.missing_data_reason.isnull()]
|
||||
print("Projecting on useful columns...")
|
||||
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
|
||||
print("Sorting by time...")
|
||||
dfm = dfm.sort_values(by=["machine_id", "time"])
|
||||
print("Converting to broadcast variable...")
|
||||
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
|
||||
print("Done with machine events.")
|
||||
|
||||
df = dfe.rdd \
|
||||
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
|
||||
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
|
||||
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
|
||||
.map(lambda x: [tabid(x), int(x.time), int(x.type),
|
||||
Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \
|
||||
.toDF(["id", "time", "type", "rcpu", "rram", "mid"])
|
||||
|
||||
|
||||
def get_machine_time_resources(machine_id, time):
|
||||
def aux(i, j):
|
||||
if i == j:
|
||||
return dfm.value[i] if dfm.value[i][1] == machine_id else None
|
||||
elif i + 1 == j:
|
||||
if dfm.value[i][1] == machine_id:
|
||||
return dfm.value[i]
|
||||
elif dfm.value[j][1] == machine_id:
|
||||
return dfm.value[j]
|
||||
else:
|
||||
return None
|
||||
|
||||
mid = (i + j) // 2
|
||||
|
||||
if dfm.value[mid][1] > machine_id:
|
||||
return aux(i, mid - 1)
|
||||
elif dfm.value[mid][1] < machine_id:
|
||||
return aux(mid + 1, j)
|
||||
elif dfm.value[mid][0] > time:
|
||||
return aux(i, mid)
|
||||
elif dfm.value[mid][0] < time:
|
||||
return aux(mid, j)
|
||||
else:
|
||||
return mid
|
||||
|
||||
return aux(0, len(dfm.value)-1)
|
||||
|
||||
def increment_reserv_bucket(bucket, taskid, value):
|
||||
if value < 0:
|
||||
idx = 0
|
||||
else:
|
||||
idx = 40 if value >= 1 else (int(value * 40) + 1)
|
||||
|
||||
if taskid not in bucket:
|
||||
bucket[taskid] = [0] * (len(ceils) + 1)
|
||||
bucket[taskid][idx] += 1
|
||||
|
||||
def bucket_sum_per_termination(bucket, last_term_by_id):
|
||||
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||
for tid, vs in bucket.items():
|
||||
term = last_term_by_id[tid]
|
||||
if result[term] is None:
|
||||
result[term] = vs
|
||||
else:
|
||||
result[term] = [sum(x) for x in zip(result[term], vs)]
|
||||
return result
|
||||
|
||||
def for_each_joined(x):
|
||||
machine_id = x[0]
|
||||
ts = x[1]
|
||||
|
||||
ts = sorted(ts, key=lambda x: x.time)
|
||||
last_req_by_id = {} # map taskid -> last known req [cpu, ram] (data removed when task terminates)
|
||||
|
||||
reserv_ceils = [0]
|
||||
cpu_reservs_by_id = {}
|
||||
ram_reservs_by_id = {}
|
||||
|
||||
last_term_by_id = {} # map taskid -> last termination
|
||||
start = get_machine_time_resources(machine_id, 0)
|
||||
end = get_machine_time_resources(machine_id, 6_000_000_000_000)
|
||||
machine_logs = None if start is None or end is None else dfm.value[start:(end+1)]
|
||||
|
||||
for i, t in enumerate(ts):
|
||||
if machine_logs is not None and len(machine_logs) > 1 and machine_logs[1][0] >= t.time:
|
||||
machine_logs.pop(0)
|
||||
if t.id not in last_term_by_id:
|
||||
last_term_by_id[t.id] = -1
|
||||
if t.rcpu is not None and t.rram is not None:
|
||||
last_req_by_id[t.id] = (t.rcpu, t.rram)
|
||||
# 8b
|
||||
tot_req = [sum(x) for x in zip(*last_req_by_id.values())]
|
||||
if machine_logs is not None:
|
||||
reserv_cpu = tot_req[0] / machine_logs[0][2]
|
||||
reserv_ram = tot_req[1] / machine_logs[0][3]
|
||||
else:
|
||||
reserv_cpu = -1
|
||||
reserv_ram = -1
|
||||
increment_reserv_bucket(cpu_reservs_by_id, t.id, reserv_cpu)
|
||||
increment_reserv_bucket(ram_reservs_by_id, t.id, reserv_ram)
|
||||
if t.type >= 4 and t.type <= 8:
|
||||
last_term_by_id[t.id] = t.type
|
||||
|
||||
resobj = {'rscpu': cpu_reservs_by_id, 'rsram': ram_reservs_by_id}
|
||||
|
||||
for k, v in resobj.items():
|
||||
resobj[k] = bucket_sum_per_termination(v, last_term_by_id)
|
||||
|
||||
return resobj
|
||||
|
||||
def fold_resobjs(ro1, ro2):
|
||||
if ro1 is None:
|
||||
return ro2
|
||||
elif ro2 is None:
|
||||
return ro1
|
||||
else:
|
||||
for k in ro1.keys():
|
||||
for kk in ro1[k].keys():
|
||||
if ro1[k][kk] is None:
|
||||
ro1[k][kk] = ro2[k][kk]
|
||||
elif ro2[k][kk] is None:
|
||||
continue
|
||||
else:
|
||||
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
|
||||
return ro1
|
||||
|
||||
# TODO: partition by id and in the for-each-row
|
||||
# function implement lookup to dfm.value to understand its memory capacity
|
||||
|
||||
import random
|
||||
|
||||
result = df.rdd \
|
||||
.groupBy(lambda x: x.mid) \
|
||||
.partitionBy(10000, lambda x: random.randint(0, 10000-1)) \
|
||||
.map(for_each_joined) \
|
||||
.fold(None, fold_resobjs)
|
||||
|
||||
d = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
with open(d + "/" + cluster + "_figure8.json", "w") as f:
|
||||
json.dump(result, f)
|
||||
|
||||
# vim: set ts=4 sw=4 et tw=120:
|
BIN
figure_8/h_figure8ab.json
(Stored with Git LFS)
Normal file
BIN
figure_8/h_figure8ab.json
(Stored with Git LFS)
Normal file
Binary file not shown.
Loading…
Reference in a new issue