More query results
This commit is contained in:
parent
2f83419985
commit
9367c293ae
|
@ -0,0 +1,119 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# coding: utf-8
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import pandas as pd
|
||||||
|
import findspark
|
||||||
|
findspark.init()
|
||||||
|
import pyspark
|
||||||
|
import pyspark.sql
|
||||||
|
import sys
|
||||||
|
import gzip
|
||||||
|
from pyspark import AccumulatorParam
|
||||||
|
from pyspark.sql.functions import lit
|
||||||
|
from pyspark.sql import Window
|
||||||
|
from pyspark.sql.types import *
|
||||||
|
from decimal import *
|
||||||
|
import random
|
||||||
|
|
||||||
|
if len(sys.argv) is not 4:
|
||||||
|
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
cluster=sys.argv[1]
|
||||||
|
|
||||||
|
spark = pyspark.sql.SparkSession.builder \
|
||||||
|
.appName("task_slowdown") \
|
||||||
|
.config("spark.driver.maxResultSize", "128g") \
|
||||||
|
.config("spark.local.dir", sys.argv[2]) \
|
||||||
|
.config("spark.driver.memory", sys.argv[3]) \
|
||||||
|
.getOrCreate()
|
||||||
|
sc = spark.sparkContext
|
||||||
|
|
||||||
|
# READING INSTANCE EVENTS DATA
|
||||||
|
dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
||||||
|
#dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json"
|
||||||
|
df = spark.read.json(dfepath)
|
||||||
|
|
||||||
|
def tabid(x):
|
||||||
|
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
|
||||||
|
|
||||||
|
def bucket_sum_per_termination(bucket, last_term_by_id):
|
||||||
|
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||||
|
for tid, vs in bucket.items():
|
||||||
|
term = last_term_by_id[tid]
|
||||||
|
if result[term] is None:
|
||||||
|
result[term] = vs
|
||||||
|
else:
|
||||||
|
result[term] = [sum(x) for x in zip(result[term], vs)]
|
||||||
|
return result
|
||||||
|
|
||||||
|
def tally_event(bucket, term, nexec):
|
||||||
|
if term not in bucket:
|
||||||
|
bucket[term] = {}
|
||||||
|
if nexec not in bucket[term]:
|
||||||
|
bucket[term][nexec] = 0
|
||||||
|
bucket[term][nexec] += 1
|
||||||
|
|
||||||
|
def for_each_joined(x):
|
||||||
|
machine_id = x[0]
|
||||||
|
ts = x[1]
|
||||||
|
|
||||||
|
ts = sorted(ts, key=lambda x: x["time"])
|
||||||
|
in_execution = set()
|
||||||
|
chum = {}
|
||||||
|
|
||||||
|
for t in ts:
|
||||||
|
in_execution.add(t["id"])
|
||||||
|
tally_event(chum, t["term"], len(in_execution))
|
||||||
|
if t["end"]:
|
||||||
|
in_execution.remove(t["id"])
|
||||||
|
|
||||||
|
return chum
|
||||||
|
|
||||||
|
def fold_resobjs(ro1, ro2):
|
||||||
|
if not ro1.keys():
|
||||||
|
return ro2
|
||||||
|
elif ro2.keys():
|
||||||
|
for k in set(ro1.keys()).union(set(ro2.keys())):
|
||||||
|
if k not in ro1:
|
||||||
|
ro1[k] = ro2[k]
|
||||||
|
elif k in ro2:
|
||||||
|
for kk in set(ro1[k].keys()).union(set(ro2[k].keys())):
|
||||||
|
if kk not in ro1[k]:
|
||||||
|
ro1[k][kk] = ro2[k][kk]
|
||||||
|
elif kk in ro2[k]:
|
||||||
|
ro1[k][kk] += ro2[k][kk]
|
||||||
|
return ro1
|
||||||
|
|
||||||
|
def mark_next(data):
|
||||||
|
ts = data[1]
|
||||||
|
ts = sorted(ts, key=lambda z: z[1])
|
||||||
|
last_term = -1
|
||||||
|
for i in range(0, len(ts)):
|
||||||
|
t = ts[i]
|
||||||
|
ts[i] = {"id": t[0], "time": t[1], "type": t[2], "mid": t[3], "end": (i == len(ts) -1 or t[3] != ts[i+1][3])}
|
||||||
|
if ts[i]["type"] >= 4 or ts[i]["type"] <= 8:
|
||||||
|
last_term = ts[i]["type"]
|
||||||
|
for t in ts:
|
||||||
|
t["term"] = last_term
|
||||||
|
return ts
|
||||||
|
|
||||||
|
result = df.rdd \
|
||||||
|
.filter(lambda x: x.time is not None and x.type is not None and
|
||||||
|
x.instance_index is not None and x.collection_id is not None) \
|
||||||
|
.map(lambda x: [tabid(x), int(x.time), int(x.type), x.machine_id]) \
|
||||||
|
.groupBy(lambda x: x[0]) \
|
||||||
|
.flatMap(mark_next) \
|
||||||
|
.groupBy(lambda x: x["mid"]) \
|
||||||
|
.partitionBy(10000, lambda x: random.randint(0, 10000-1)) \
|
||||||
|
.map(for_each_joined) \
|
||||||
|
.fold({}, fold_resobjs)
|
||||||
|
|
||||||
|
d = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
with open(d + "/" + cluster + "_figure7c.json", "w") as f:
|
||||||
|
json.dump(result, f)
|
||||||
|
|
||||||
|
# vim: set ts=4 sw=4 et tw=120:
|
|
@ -0,0 +1 @@
|
||||||
|
{"rcpu": {"-1": [11792934, 100581, 119504, 33602], "4": [149466039, 3954384, 522279, 319171], "5": [30461383, 705982, 138442, 333102], "6": [152111586, 740741, 53960, 66869], "7": [273451772, 9013332, 1598241, 1251452], "8": [3637379, 106001, 59062, 23583]}, "rram": {"-1": [11916974, 84471, 11523, 33653], "4": [152162293, 1463695, 481913, 153972], "5": [31343967, 108823, 112227, 73892], "6": [136880563, 16017698, 62739, 12156], "7": [278881639, 3888697, 1611197, 933264], "8": [3727840, 74738, 6063, 17384]}, "rscpu": {"-1": [17686, 33931, 24226, 19646, 19606, 23959, 11907567], "4": [345377, 76462, 78162, 75465, 80602, 92738, 153513067], "5": [98104, 17584, 19982, 19529, 20228, 20879, 31442603], "6": [385085, 39535, 69156, 95234, 123505, 157547, 152103094], "7": [856729, 139265, 154106, 162797, 168047, 182857, 283650996], "8": [3169, 2773, 2464, 2334, 2164, 2065, 3811056]}, "rsram": {"-1": [17686, 27788, 27824, 22687, 21497, 20384, 11908755], "4": [345377, 64842, 88600, 90874, 106968, 121787, 153443425], "5": [98104, 19833, 24948, 25351, 27876, 27829, 31414968], "6": [385085, 38938, 83458, 160369, 311025, 355386, 151638895], "7": [856729, 147165, 190726, 204662, 218966, 232164, 283464385], "8": [3169, 2997, 2561, 2433, 2460, 2717, 3809688]}, "ucpu": {"-1": [65359, 11980186, 1059, 17, 0, 0, 0], "4": [20079611, 134178471, 3727, 60, 3, 0, 1], "5": [12413418, 19219959, 2058, 1262, 989, 586, 637], "6": [7877670, 145094583, 807, 8, 13, 10, 65], "7": [21046407, 264204898, 61360, 1623, 252, 126, 131], "8": [224410, 3598907, 2570, 53, 47, 15, 23]}, "uram": {"-1": [11681, 12034890, 50, 0, 0, 0, 0], "4": [936757, 153325001, 114, 1, 0, 0, 0], "5": [349268, 31289585, 56, 0, 0, 0, 0], "6": [175056, 152797241, 859, 0, 0, 0, 0], "7": [2281373, 283033270, 154, 0, 0, 0, 0], "8": [27953, 3798067, 5, 0, 0, 0, 0]}}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -0,0 +1 @@
|
||||||
|
{"-1": {"rcpu": [0, 12076, 1006, 490, 838, 219, 40, 11, 0, 0, 36, 55, 36, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 12973, 1572, 168, 19, 20, 3, 4, 32, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "4": {"rcpu": [0, 20347796, 902125, 766341, 26926, 104874, 13456, 2521, 0, 0, 11389, 956, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 22005994, 148318, 19943, 1521, 48, 2, 560, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "5": {"rcpu": [0, 15679371, 716841, 595324, 1126, 40010, 2465, 117, 61, 0, 0, 995, 491604, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 16429871, 1078605, 6649, 7031, 4944, 763, 2, 14, 0, 0, 14, 0, 1, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "6": {"rcpu": [0, 10124332, 15166033, 3453386, 596140, 255252, 70171, 10774, 540, 565, 11824, 973367, 31434, 33764, 36, 0, 24, 3, 21, 0, 0, 78, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 28182669, 1112336, 1290233, 63083, 11158, 905, 14117, 51153, 1982, 28, 47, 0, 0, 0, 0, 33, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "7": {"rcpu": [0, 1319563158, 65452181, 16179658, 4864588, 1982307, 338136, 223834, 618, 1525, 10678, 504897, 367755, 48, 0, 1829, 389, 0, 0, 0, 0, 178, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 1377875241, 25806542, 2397591, 2730903, 554881, 38638, 3962, 19351, 19349, 39671, 3808, 1724, 0, 0, 0, 11, 0, 0, 0, 0, 61, 0, 0, 9, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 0, 0, 0, 0]}, "8": {"rcpu": [0, 4, 1411, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 1415, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}}
|
|
@ -0,0 +1 @@
|
||||||
|
{"-1": {"rcpu": [0, 27311, 2244, 438, 53, 423, 13, 22, 1, 0, 10, 29, 4, 1, 0, 19, 0, 0, 0, 0, 0, 0, 0, 0, 57, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 30210, 221, 47, 26, 74, 0, 2, 21, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "4": {"rcpu": [0, 72654607, 2035364, 2266839, 32152, 211157, 68608, 61548, 24893, 24946, 10, 165, 75, 54, 1346, 21, 12, 14, 4, 4, 2, 0, 0, 2, 55, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 76307431, 605581, 279503, 81292, 8860, 25947, 69369, 1647, 1, 1126, 1097, 22, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "5": {"rcpu": [0, 10972613, 1084557, 407277, 19766, 120268, 566, 11370, 109, 3736, 77, 17, 3261, 80, 0, 0, 39, 3, 7, 11, 2, 2, 35, 21, 722, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 12340674, 259595, 4623, 3996, 5073, 1030, 1616, 3156, 1553, 368, 97, 2602, 152, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "6": {"rcpu": [0, 71728547, 3842669, 738222, 152925, 244881, 33316, 39901, 31102, 35512, 28782, 36532, 49808, 24386, 20584, 21132, 24488, 30146, 6415, 1838, 1066, 862, 1062, 411, 3347, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 66248694, 10293282, 504401, 21727, 8463, 7719, 3685, 2322, 2341, 720, 48, 2347, 1372, 644, 0, 66, 7, 6, 0, 26, 38, 9, 3, 0, 0, 0, 14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "7": {"rcpu": [0, 921464185, 80650116, 11069588, 8057696, 2113207, 278409, 300672, 209361, 216900, 121828, 281065, 290973, 27617, 72253, 1859, 2603, 552, 3511, 1317, 7703, 354, 751, 275, 4215, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 992546443, 25626422, 4909105, 709766, 407857, 224788, 300932, 122026, 50524, 37312, 29839, 197998, 219, 7432, 6283, 9, 2, 5, 1, 0, 33, 0, 4, 0, 0, 0, 13, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "8": {"rcpu": [0, 5721037, 0, 22602, 20945, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 5028956, 735628, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}}
|
|
@ -0,0 +1,118 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# coding: utf-8
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import pandas as pd
|
||||||
|
import findspark
|
||||||
|
findspark.init()
|
||||||
|
import pyspark
|
||||||
|
import pyspark.sql
|
||||||
|
import sys
|
||||||
|
import gzip
|
||||||
|
from pyspark import AccumulatorParam
|
||||||
|
from pyspark.sql.functions import lit
|
||||||
|
from pyspark.sql import Window
|
||||||
|
from pyspark.sql.types import *
|
||||||
|
from decimal import *
|
||||||
|
|
||||||
|
if len(sys.argv) is not 5:
|
||||||
|
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}")
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
cluster=sys.argv[1]
|
||||||
|
|
||||||
|
spark = pyspark.sql.SparkSession.builder \
|
||||||
|
.appName("task_slowdown") \
|
||||||
|
.config("spark.driver.maxResultSize", "128g") \
|
||||||
|
.config("spark.local.dir", sys.argv[2]) \
|
||||||
|
.config("spark.driver.memory", sys.argv[3]) \
|
||||||
|
.getOrCreate()
|
||||||
|
sc = spark.sparkContext
|
||||||
|
|
||||||
|
# READING INSTANCE EVENTS DATA
|
||||||
|
dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
||||||
|
#dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json"
|
||||||
|
df = spark.read.json(dfepath)
|
||||||
|
|
||||||
|
#df = df.limit(10000)
|
||||||
|
|
||||||
|
def tabid(x):
|
||||||
|
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
|
||||||
|
|
||||||
|
|
||||||
|
def increment_reserv_bucket(bucket, value):
|
||||||
|
if value < 0:
|
||||||
|
idx = 0
|
||||||
|
else:
|
||||||
|
idx = 40 if value >= 1 else (int(value * 40) + 1)
|
||||||
|
bucket[idx] += 1
|
||||||
|
|
||||||
|
def for_each_joined(var):
|
||||||
|
task_id = var[0]
|
||||||
|
ts = var[1]
|
||||||
|
|
||||||
|
term = -1
|
||||||
|
ts = sorted(ts, key=lambda x: -1 if x["time"] is None else x["time"])
|
||||||
|
|
||||||
|
cpu_request = [0] * 42
|
||||||
|
ram_request = [0] * 42
|
||||||
|
|
||||||
|
cut = False
|
||||||
|
for i in range(0, len(ts)):
|
||||||
|
if ts[i]["time"] is not None:
|
||||||
|
cut = True
|
||||||
|
ts = ts[i:len(ts)]
|
||||||
|
cpu_request[41] = i
|
||||||
|
ram_request[41] = i
|
||||||
|
break
|
||||||
|
|
||||||
|
if not cut:
|
||||||
|
raise Exception('all times are none')
|
||||||
|
|
||||||
|
for i, t in enumerate(ts):
|
||||||
|
increment_reserv_bucket(cpu_request, t["rcpu"])
|
||||||
|
increment_reserv_bucket(ram_request, t["rram"])
|
||||||
|
|
||||||
|
if t["type"] >= 4 and t["type"] <= 8:
|
||||||
|
term = t["type"]
|
||||||
|
|
||||||
|
res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||||
|
res[term] = {'rcpu': cpu_request, 'rram': ram_request}
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
def fold_resobjs(ro1, ro2):
|
||||||
|
if ro1 is None:
|
||||||
|
return ro2
|
||||||
|
elif ro2 is None:
|
||||||
|
return ro1
|
||||||
|
else:
|
||||||
|
for k in ro1.keys():
|
||||||
|
if ro1[k] is None:
|
||||||
|
ro1[k] = ro2[k]
|
||||||
|
elif ro2[k] is not None:
|
||||||
|
for kk in ro1[k].keys():
|
||||||
|
if ro1[k][kk] is None:
|
||||||
|
ro1[k][kk] = ro2[k][kk]
|
||||||
|
elif ro2[k][kk] is not None:
|
||||||
|
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
|
||||||
|
return ro1
|
||||||
|
|
||||||
|
result = df.rdd \
|
||||||
|
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
|
||||||
|
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
|
||||||
|
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
|
||||||
|
.map(lambda x: {"id": tabid(x), "time": int(x.time), "type": int(x.type),
|
||||||
|
"rcpu": Decimal(x.resource_request.cpus), "rram": Decimal(x.resource_request.memory),
|
||||||
|
"mid": x.machine_id}) \
|
||||||
|
.groupBy(lambda x: x["id"]) \
|
||||||
|
.map(for_each_joined) \
|
||||||
|
.fold(None, fold_resobjs)
|
||||||
|
|
||||||
|
d = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
with open(d + "/" + cluster + "_figure8ab.json", "w") as f:
|
||||||
|
json.dump(result, f)
|
||||||
|
|
||||||
|
# vim: set ts=4 sw=4 et tw=120:
|
|
@ -0,0 +1,157 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# coding: utf-8
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import pandas as pd
|
||||||
|
import findspark
|
||||||
|
findspark.init()
|
||||||
|
import pyspark
|
||||||
|
import pyspark.sql
|
||||||
|
import sys
|
||||||
|
import gzip
|
||||||
|
from pyspark import AccumulatorParam
|
||||||
|
from pyspark.sql.functions import lit
|
||||||
|
from pyspark.sql import Window
|
||||||
|
from pyspark.sql.types import *
|
||||||
|
from decimal import *
|
||||||
|
|
||||||
|
if len(sys.argv) is not 5:
|
||||||
|
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {joindir}")
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
joindir=sys.argv[4]
|
||||||
|
|
||||||
|
cluster=sys.argv[1]
|
||||||
|
|
||||||
|
spark = pyspark.sql.SparkSession.builder \
|
||||||
|
.appName("task_slowdown") \
|
||||||
|
.config("spark.driver.maxResultSize", sys.argv[3]) \
|
||||||
|
.config("spark.local.dir", sys.argv[2]) \
|
||||||
|
.config("spark.driver.memory", sys.argv[3]) \
|
||||||
|
.getOrCreate()
|
||||||
|
sc = spark.sparkContext
|
||||||
|
|
||||||
|
df = spark.read.parquet(joindir + "/figure-8-join-" + cluster + ".parquet")
|
||||||
|
|
||||||
|
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
||||||
|
print("Starting to read machine events...")
|
||||||
|
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
|
||||||
|
'time': lambda x: -1 if x == '' else int(x),
|
||||||
|
'machine_id': lambda x: str(x),
|
||||||
|
'capacity.cpus': lambda x: -1 if x == '' else Decimal(x),
|
||||||
|
'capacity.memory': lambda x: -1 if x == '' else Decimal(x)})
|
||||||
|
print("Dropping remove events...")
|
||||||
|
dfm = dfm[(dfm.type!=2)&(dfm.time!=-1)&(dfm["capacity.cpus"]!=-1)&(dfm["capacity.memory"]!=-1)]
|
||||||
|
print("Dropping missing data events...")
|
||||||
|
dfm = dfm[dfm.missing_data_reason.isnull()]
|
||||||
|
print("Projecting on useful columns...")
|
||||||
|
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
|
||||||
|
print("Sorting by time...")
|
||||||
|
dfm = dfm.sort_values(by=["machine_id", "time"])
|
||||||
|
print("Converting to broadcast variable...")
|
||||||
|
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
|
||||||
|
print("Done with machine events.")
|
||||||
|
|
||||||
|
def get_machine_time_resources(machine_id, time):
|
||||||
|
def aux(i, j):
|
||||||
|
if i == j:
|
||||||
|
return dfm.value[i] if dfm.value[i][1] == machine_id else None
|
||||||
|
elif i + 1 == j:
|
||||||
|
if dfm.value[i][1] == machine_id:
|
||||||
|
return dfm.value[i]
|
||||||
|
elif dfm.value[j][1] == machine_id:
|
||||||
|
return dfm.value[j]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
mid = (i + j) // 2
|
||||||
|
|
||||||
|
if dfm.value[mid][1] > machine_id:
|
||||||
|
return aux(i, mid - 1)
|
||||||
|
elif dfm.value[mid][1] < machine_id:
|
||||||
|
return aux(mid + 1, j)
|
||||||
|
elif dfm.value[mid][0] > time:
|
||||||
|
return aux(i, mid)
|
||||||
|
elif dfm.value[mid][0] < time:
|
||||||
|
return aux(mid, j)
|
||||||
|
else:
|
||||||
|
return dfm.value[mid]
|
||||||
|
|
||||||
|
return aux(0, len(dfm.value)-1)
|
||||||
|
|
||||||
|
def increment_reserv_bucket(bucket, value):
|
||||||
|
if value < 0:
|
||||||
|
idx = 0
|
||||||
|
else:
|
||||||
|
idx = 40 if value >= 1 else (int(value * 40) + 1)
|
||||||
|
bucket[idx] += 1
|
||||||
|
|
||||||
|
def for_each_joined(x):
|
||||||
|
task_id = x[0]
|
||||||
|
ts = x[1]
|
||||||
|
|
||||||
|
term = -1
|
||||||
|
ts = sorted(ts, key=lambda x: x.time)
|
||||||
|
|
||||||
|
cpu_util = [0] * 41
|
||||||
|
cpu_util_wr = [0] * 41
|
||||||
|
ram_util = [0] * 41
|
||||||
|
ram_util_wr = [0] * 41
|
||||||
|
cpu_request = [0] * 41
|
||||||
|
ram_request = [0] * 41
|
||||||
|
|
||||||
|
for i, t in enumerate(ts):
|
||||||
|
machine_log = get_machine_time_resources(t.mid, t.time)
|
||||||
|
if machine_log is not None:
|
||||||
|
util_cpu = t.acpu / machine_log[2]
|
||||||
|
util_ram = t.aram / machine_log[3]
|
||||||
|
else:
|
||||||
|
util_cpu = -1
|
||||||
|
util_ram = -1
|
||||||
|
# 8a-b
|
||||||
|
increment_reserv_bucket(cpu_request, t.rcpu)
|
||||||
|
increment_reserv_bucket(ram_request, t.rram)
|
||||||
|
# 8e-f (wrong old version)
|
||||||
|
increment_reserv_bucket(cpu_util_wr, t.acpu)
|
||||||
|
increment_reserv_bucket(ram_util_wr, t.aram)
|
||||||
|
# 8e-f
|
||||||
|
increment_reserv_bucket(cpu_util, util_cpu)
|
||||||
|
increment_reserv_bucket(ram_util, util_ram)
|
||||||
|
|
||||||
|
if t.type >= 4 and t.type <= 8:
|
||||||
|
term = t.type
|
||||||
|
|
||||||
|
res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||||
|
res[term] = {'rcpu': cpu_request, 'rram': ram_request, 'wucpu': cpu_util_wr, 'wuram': ram_util_wr, 'ucpu': cpu_util, 'uram': ram_util}
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
def fold_resobjs(ro1, ro2):
|
||||||
|
if ro1 is None:
|
||||||
|
return ro2
|
||||||
|
elif ro2 is None:
|
||||||
|
return ro1
|
||||||
|
else:
|
||||||
|
for k in ro1.keys():
|
||||||
|
if ro1[k] is None:
|
||||||
|
ro1[k] = ro2[k]
|
||||||
|
elif ro2[k] is not None:
|
||||||
|
for kk in ro1[k].keys():
|
||||||
|
if ro1[k][kk] is None:
|
||||||
|
ro1[k][kk] = ro2[k][kk]
|
||||||
|
elif ro2[k][kk] is not None:
|
||||||
|
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
|
||||||
|
return ro1
|
||||||
|
|
||||||
|
result = df.rdd \
|
||||||
|
.groupBy(lambda x: x.id) \
|
||||||
|
.map(for_each_joined) \
|
||||||
|
.fold(None, fold_resobjs)
|
||||||
|
|
||||||
|
d = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
with open(d + "/" + cluster + "_figure8abef.json", "w") as f:
|
||||||
|
json.dump(result, f)
|
||||||
|
|
||||||
|
# vim: set ts=4 sw=4 et tw=120:
|
|
@ -0,0 +1,190 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# coding: utf-8
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import pandas as pd
|
||||||
|
import findspark
|
||||||
|
findspark.init()
|
||||||
|
import pyspark
|
||||||
|
import pyspark.sql
|
||||||
|
import sys
|
||||||
|
import gzip
|
||||||
|
from pyspark import AccumulatorParam
|
||||||
|
from pyspark.sql.functions import lit
|
||||||
|
from pyspark.sql import Window
|
||||||
|
from pyspark.sql.types import *
|
||||||
|
from decimal import *
|
||||||
|
|
||||||
|
if len(sys.argv) is not 4:
|
||||||
|
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
cluster=sys.argv[1]
|
||||||
|
|
||||||
|
spark = pyspark.sql.SparkSession.builder \
|
||||||
|
.appName("task_slowdown") \
|
||||||
|
.config("spark.driver.maxResultSize", "128g") \
|
||||||
|
.config("spark.local.dir", sys.argv[2]) \
|
||||||
|
.config("spark.driver.memory", sys.argv[3]) \
|
||||||
|
.getOrCreate()
|
||||||
|
sc = spark.sparkContext
|
||||||
|
|
||||||
|
#
|
||||||
|
# READING INSTANCE EVENTS DATA
|
||||||
|
#
|
||||||
|
dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
||||||
|
df = spark.read.json(dfepath)
|
||||||
|
df = df.limit(10000)
|
||||||
|
|
||||||
|
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
||||||
|
print("Starting to read machine events...")
|
||||||
|
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
|
||||||
|
'time': lambda x: -1 if x == '' else int(x),
|
||||||
|
'machine_id': lambda x: str(x),
|
||||||
|
'capacity.cpus': lambda x: -1 if x == '' else Decimal(x),
|
||||||
|
'capacity.memory': lambda x: -1 if x == '' else Decimal(x)})
|
||||||
|
print("Dropping remove events...")
|
||||||
|
dfm = dfm[(dfm.type!=2)&(dfm.time!=-1)&(dfm["capacity.cpus"]!=-1)&(dfm["capacity.memory"]!=-1)]
|
||||||
|
print("Dropping missing data events...")
|
||||||
|
dfm = dfm[dfm.missing_data_reason.isnull()]
|
||||||
|
print("Projecting on useful columns...")
|
||||||
|
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
|
||||||
|
print("Sorting by time...")
|
||||||
|
dfm = dfm.sort_values(by=["machine_id", "time"])
|
||||||
|
print("Converting to broadcast variable...")
|
||||||
|
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
|
||||||
|
print("Done with machine events.")
|
||||||
|
|
||||||
|
df = dfe.rdd \
|
||||||
|
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
|
||||||
|
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
|
||||||
|
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
|
||||||
|
.map(lambda x: [tabid(x), int(x.time), int(x.type),
|
||||||
|
Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \
|
||||||
|
.toDF(["id", "time", "type", "rcpu", "rram", "mid"])
|
||||||
|
|
||||||
|
|
||||||
|
def get_machine_time_resources(machine_id, time):
|
||||||
|
def aux(i, j):
|
||||||
|
if i == j:
|
||||||
|
return dfm.value[i] if dfm.value[i][1] == machine_id else None
|
||||||
|
elif i + 1 == j:
|
||||||
|
if dfm.value[i][1] == machine_id:
|
||||||
|
return dfm.value[i]
|
||||||
|
elif dfm.value[j][1] == machine_id:
|
||||||
|
return dfm.value[j]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
mid = (i + j) // 2
|
||||||
|
|
||||||
|
if dfm.value[mid][1] > machine_id:
|
||||||
|
return aux(i, mid - 1)
|
||||||
|
elif dfm.value[mid][1] < machine_id:
|
||||||
|
return aux(mid + 1, j)
|
||||||
|
elif dfm.value[mid][0] > time:
|
||||||
|
return aux(i, mid)
|
||||||
|
elif dfm.value[mid][0] < time:
|
||||||
|
return aux(mid, j)
|
||||||
|
else:
|
||||||
|
return mid
|
||||||
|
|
||||||
|
return aux(0, len(dfm.value)-1)
|
||||||
|
|
||||||
|
def increment_reserv_bucket(bucket, taskid, value):
|
||||||
|
if value < 0:
|
||||||
|
idx = 0
|
||||||
|
else:
|
||||||
|
idx = 40 if value >= 1 else (int(value * 40) + 1)
|
||||||
|
|
||||||
|
if taskid not in bucket:
|
||||||
|
bucket[taskid] = [0] * (len(ceils) + 1)
|
||||||
|
bucket[taskid][idx] += 1
|
||||||
|
|
||||||
|
def bucket_sum_per_termination(bucket, last_term_by_id):
|
||||||
|
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||||
|
for tid, vs in bucket.items():
|
||||||
|
term = last_term_by_id[tid]
|
||||||
|
if result[term] is None:
|
||||||
|
result[term] = vs
|
||||||
|
else:
|
||||||
|
result[term] = [sum(x) for x in zip(result[term], vs)]
|
||||||
|
return result
|
||||||
|
|
||||||
|
def for_each_joined(x):
|
||||||
|
machine_id = x[0]
|
||||||
|
ts = x[1]
|
||||||
|
|
||||||
|
ts = sorted(ts, key=lambda x: x.time)
|
||||||
|
last_req_by_id = {} # map taskid -> last known req [cpu, ram] (data removed when task terminates)
|
||||||
|
|
||||||
|
reserv_ceils = [0]
|
||||||
|
cpu_reservs_by_id = {}
|
||||||
|
ram_reservs_by_id = {}
|
||||||
|
|
||||||
|
last_term_by_id = {} # map taskid -> last termination
|
||||||
|
start = get_machine_time_resources(machine_id, 0)
|
||||||
|
end = get_machine_time_resources(machine_id, 6_000_000_000_000)
|
||||||
|
machine_logs = None if start is None or end is None else dfm.value[start:(end+1)]
|
||||||
|
|
||||||
|
for i, t in enumerate(ts):
|
||||||
|
if machine_logs is not None and len(machine_logs) > 1 and machine_logs[1][0] >= t.time:
|
||||||
|
machine_logs.pop(0)
|
||||||
|
if t.id not in last_term_by_id:
|
||||||
|
last_term_by_id[t.id] = -1
|
||||||
|
if t.rcpu is not None and t.rram is not None:
|
||||||
|
last_req_by_id[t.id] = (t.rcpu, t.rram)
|
||||||
|
# 8b
|
||||||
|
tot_req = [sum(x) for x in zip(*last_req_by_id.values())]
|
||||||
|
if machine_logs is not None:
|
||||||
|
reserv_cpu = tot_req[0] / machine_logs[0][2]
|
||||||
|
reserv_ram = tot_req[1] / machine_logs[0][3]
|
||||||
|
else:
|
||||||
|
reserv_cpu = -1
|
||||||
|
reserv_ram = -1
|
||||||
|
increment_reserv_bucket(cpu_reservs_by_id, t.id, reserv_cpu)
|
||||||
|
increment_reserv_bucket(ram_reservs_by_id, t.id, reserv_ram)
|
||||||
|
if t.type >= 4 and t.type <= 8:
|
||||||
|
last_term_by_id[t.id] = t.type
|
||||||
|
|
||||||
|
resobj = {'rscpu': cpu_reservs_by_id, 'rsram': ram_reservs_by_id}
|
||||||
|
|
||||||
|
for k, v in resobj.items():
|
||||||
|
resobj[k] = bucket_sum_per_termination(v, last_term_by_id)
|
||||||
|
|
||||||
|
return resobj
|
||||||
|
|
||||||
|
def fold_resobjs(ro1, ro2):
|
||||||
|
if ro1 is None:
|
||||||
|
return ro2
|
||||||
|
elif ro2 is None:
|
||||||
|
return ro1
|
||||||
|
else:
|
||||||
|
for k in ro1.keys():
|
||||||
|
for kk in ro1[k].keys():
|
||||||
|
if ro1[k][kk] is None:
|
||||||
|
ro1[k][kk] = ro2[k][kk]
|
||||||
|
elif ro2[k][kk] is None:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
|
||||||
|
return ro1
|
||||||
|
|
||||||
|
# TODO: partition by id and in the for-each-row
|
||||||
|
# function implement lookup to dfm.value to understand its memory capacity
|
||||||
|
|
||||||
|
import random
|
||||||
|
|
||||||
|
result = df.rdd \
|
||||||
|
.groupBy(lambda x: x.mid) \
|
||||||
|
.partitionBy(10000, lambda x: random.randint(0, 10000-1)) \
|
||||||
|
.map(for_each_joined) \
|
||||||
|
.fold(None, fold_resobjs)
|
||||||
|
|
||||||
|
d = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
with open(d + "/" + cluster + "_figure8.json", "w") as f:
|
||||||
|
json.dump(result, f)
|
||||||
|
|
||||||
|
# vim: set ts=4 sw=4 et tw=120:
|
|
@ -0,0 +1 @@
|
||||||
|
{"-1": {"rcpu": [0, 37134, 7823, 1095, 705, 254, 405, 68, 86, 0, 5, 16, 44, 32, 0, 0, 0, 0, 0, 0, 8, 2, 0, 0, 155, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 41623, 2064, 3246, 443, 170, 86, 8, 42, 0, 0, 150, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "4": {"rcpu": [0, 8052123, 1800398, 247696, 45236, 86246, 21765, 5071, 636, 7, 2, 14, 35, 2, 162, 2368, 1233, 4, 2, 2, 3, 0, 2, 10, 45, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 9508643, 252810, 65861, 40368, 391949, 491, 1653, 897, 251, 33, 60, 40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 2, 0, 0, 0, 0, 0, 0, 0]}, "5": {"rcpu": [0, 3795007, 148577, 622218, 3839, 1547, 131, 2295, 60, 21, 0, 0, 2, 0, 0, 0, 0, 2, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 3901983, 649474, 10598, 9401, 1142, 270, 729, 4, 10, 9, 9, 134, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}, "6": {"rcpu": [0, 60804001, 3479863, 795860, 798587, 244544, 77010, 88312, 13604, 1556, 1233, 9895, 8786, 2826, 14, 12, 4, 2, 0, 0, 146, 27, 0, 333, 22448, 0, 0, 27, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 52755625, 13102031, 328036, 90326, 30961, 6605, 996, 9483, 2, 0, 22523, 1854, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 27, 18, 234, 336, 16, 9, 4, 0, 2, 0, 0]}, "7": {"rcpu": [0, 1044640256, 52093234, 8435166, 16531024, 1822904, 118770, 256291, 125781, 93327, 18627, 152169, 502207, 222, 2065, 1849, 1332, 1320, 1481, 77, 169, 42, 4, 1668, 37, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 1096664639, 19126164, 5041851, 2122806, 1223639, 118066, 101040, 97793, 55379, 16593, 18182, 213667, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 106, 18, 0, 12, 35, 31, 1, 0, 0, 0, 0, 0]}, "8": {"rcpu": [0, 3687040, 71509, 887, 1004, 63718, 20, 10, 6, 0, 0, 8, 6, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "rram": [0, 3768719, 8504, 46889, 52, 20, 6, 6, 8, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}}
|
Loading…
Reference in New Issue