update
This commit is contained in:
parent
7350f43af7
commit
f2b35fad63
18891 changed files with 1743 additions and 1 deletions
.gitignore
figure_7
a_priority_exectime
b_priority_exectime
c_priority_exectime
d_priority_exectime
e_priority_exectime
f_priority_exectime
figure7ab.pyg_priority_exectime
h_priority_exectime
figure_8
machine_configs
machine_time_waste
spatial_resource_waste
CORRUPTION.txt
a_actual
._SUCCESS.crc.part-00000-447ca1ed-c2ea-4f1e-88e7-de81d43b29eb-c000.csv.crc_SUCCESSpart-00000-447ca1ed-c2ea-4f1e-88e7-de81d43b29eb-c000.csv
a_res_micros_actual_per_job
._SUCCESS.crc.part-00000.gz.crc.part-00001.gz.crc.part-00002.gz.crc.part-00003.gz.crc.part-00004.gz.crc.part-00005.gz.crc.part-00006.gz.crc.part-00007.gz.crc.part-00008.gz.crc.part-00009.gz.crc.part-00010.gz.crc.part-00011.gz.crc.part-00012.gz.crc.part-00013.gz.crc.part-00014.gz.crc.part-00015.gz.crc.part-00016.gz.crc.part-00017.gz.crc.part-00018.gz.crc.part-00019.gz.crc.part-00020.gz.crc.part-00021.gz.crc.part-00022.gz.crc.part-00023.gz.crc.part-00024.gz.crc.part-00025.gz.crc.part-00026.gz.crc.part-00027.gz.crc.part-00028.gz.crc.part-00029.gz.crc.part-00030.gz.crc.part-00031.gz.crc.part-00032.gz.crc.part-00033.gz.crc.part-00034.gz.crc.part-00035.gz.crc.part-00036.gz.crc.part-00037.gz.crc.part-00038.gz.crc.part-00039.gz.crc.part-00040.gz.crc.part-00041.gz.crc.part-00042.gz.crc.part-00043.gz.crc.part-00044.gz.crc.part-00045.gz.crc.part-00046.gz.crc.part-00047.gz.crc.part-00048.gz.crc.part-00049.gz.crc.part-00050.gz.crc.part-00051.gz.crc.part-00052.gz.crc.part-00053.gz.crc.part-00054.gz.crc
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1 +1,3 @@
|
|||
**/.ipynb_checkpoints/
|
||||
task_slowdown/?_state_changes.json.gz
|
||||
**/*.gz
|
||||
|
|
BIN
figure_7/a_priority_exectime/._SUCCESS.crc
Normal file
BIN
figure_7/a_priority_exectime/._SUCCESS.crc
Normal file
Binary file not shown.
BIN
figure_7/a_priority_exectime/.part-00000.crc
Normal file
BIN
figure_7/a_priority_exectime/.part-00000.crc
Normal file
Binary file not shown.
0
figure_7/a_priority_exectime/_SUCCESS
Normal file
0
figure_7/a_priority_exectime/_SUCCESS
Normal file
32
figure_7/a_priority_exectime/part-00000
Normal file
32
figure_7/a_priority_exectime/part-00000
Normal file
|
@ -0,0 +1,32 @@
|
|||
6,0,<1,15931250
|
||||
6,0,10-60,43594153
|
||||
4,0,<1,4832
|
||||
4,0,1-2,10239
|
||||
6,0,4-10,37023938
|
||||
5,0,2-4,153744
|
||||
7,0,>=1d,585952420
|
||||
7,0,4-10,44599121
|
||||
5,0,4-10,94793
|
||||
4,0,10-60,871433
|
||||
6,0,>=1d,214183666
|
||||
8,0,<1,581529
|
||||
7,0,<1,47439916
|
||||
-1,0,<1,2976745
|
||||
5,0,1-2,2898784
|
||||
4,0,4-10,174080
|
||||
5,0,10-60,2667394
|
||||
5,0,60-1d,1042564
|
||||
6,0,1-2,52132565
|
||||
8,0,60-1d,29
|
||||
7,0,1-2,32273543
|
||||
4,0,>=1d,74469275
|
||||
6,0,60-1d,12314134
|
||||
8,0,>=1d,414312
|
||||
4,0,60-1d,6059050
|
||||
5,0,<1,525809
|
||||
5,0,>=1d,14986195
|
||||
4,0,2-4,35855
|
||||
7,0,2-4,20437332
|
||||
7,0,10-60,79247427
|
||||
6,0,2-4,39184016
|
||||
7,0,60-1d,309129009
|
BIN
figure_7/b_priority_exectime/._SUCCESS.crc
Normal file
BIN
figure_7/b_priority_exectime/._SUCCESS.crc
Normal file
Binary file not shown.
BIN
figure_7/b_priority_exectime/.part-00000.crc
Normal file
BIN
figure_7/b_priority_exectime/.part-00000.crc
Normal file
Binary file not shown.
0
figure_7/b_priority_exectime/_SUCCESS
Normal file
0
figure_7/b_priority_exectime/_SUCCESS
Normal file
34
figure_7/b_priority_exectime/part-00000
Normal file
34
figure_7/b_priority_exectime/part-00000
Normal file
|
@ -0,0 +1,34 @@
|
|||
6,0,4-10,28788042
|
||||
4,0,1-2,27192
|
||||
5,0,4-10,894267
|
||||
5,0,<1,31846
|
||||
7,0,>=1d,959768322
|
||||
5,0,60-1d,5150452
|
||||
5,0,2-4,1154255
|
||||
-1,0,<1,2470086
|
||||
6,0,60-1d,31292653
|
||||
7,0,<1,66024826
|
||||
8,0,60-1d,22
|
||||
7,0,4-10,131293574
|
||||
4,0,10-60,475870
|
||||
7,0,2-4,63304732
|
||||
8,0,4-10,32
|
||||
4,0,>=1d,39003436
|
||||
5,0,>=1d,4619261
|
||||
4,0,2-4,88680
|
||||
7,0,60-1d,752719676
|
||||
8,0,>=1d,434174
|
||||
8,0,<1,382878
|
||||
6,0,1-2,73380248
|
||||
6,0,>=1d,139751172
|
||||
4,0,4-10,242628
|
||||
4,0,60-1d,13808200
|
||||
5,0,10-60,2987439
|
||||
4,0,<1,33079
|
||||
5,0,1-2,39351
|
||||
6,0,2-4,46692501
|
||||
6,0,10-60,63401359
|
||||
8,0,10-60,56
|
||||
7,0,1-2,32974421
|
||||
7,0,10-60,255723593
|
||||
6,0,<1,108838664
|
BIN
figure_7/c_priority_exectime/._SUCCESS.crc
Normal file
BIN
figure_7/c_priority_exectime/._SUCCESS.crc
Normal file
Binary file not shown.
BIN
figure_7/c_priority_exectime/.part-00000.crc
Normal file
BIN
figure_7/c_priority_exectime/.part-00000.crc
Normal file
Binary file not shown.
0
figure_7/c_priority_exectime/_SUCCESS
Normal file
0
figure_7/c_priority_exectime/_SUCCESS
Normal file
36
figure_7/c_priority_exectime/part-00000
Normal file
36
figure_7/c_priority_exectime/part-00000
Normal file
|
@ -0,0 +1,36 @@
|
|||
6,0,4-10,199505836
|
||||
7,0,4-10,755777336
|
||||
-1,0,<1,3952663
|
||||
5,0,2-4,495781
|
||||
7,0,2-4,390478553
|
||||
6,0,>=1d,292113564
|
||||
7,0,60-1d,1308416813
|
||||
4,0,1-2,39982
|
||||
8,0,4-10,24684
|
||||
6,0,<1,311177825
|
||||
4,0,4-10,400599
|
||||
8,0,10-60,9397
|
||||
4,0,10-60,976337
|
||||
5,0,<1,188610
|
||||
4,0,60-1d,7618699
|
||||
8,0,>=1d,711578
|
||||
8,0,1-2,61384
|
||||
4,0,>=1d,73841164
|
||||
4,0,2-4,103581
|
||||
7,0,1-2,132922390
|
||||
5,0,>=1d,51287123
|
||||
8,0,<1,664765
|
||||
5,0,10-60,4193865
|
||||
7,0,10-60,800096395
|
||||
5,0,60-1d,1971893
|
||||
7,0,<1,196256317
|
||||
4,0,<1,13860
|
||||
6,0,10-60,197074666
|
||||
8,0,2-4,53375
|
||||
6,0,60-1d,91864360
|
||||
6,0,2-4,244883856
|
||||
5,0,1-2,226157
|
||||
7,0,>=1d,1382608594
|
||||
5,0,4-10,2491187
|
||||
6,0,1-2,280568210
|
||||
8,0,60-1d,900
|
BIN
figure_7/d_priority_exectime/._SUCCESS.crc
Normal file
BIN
figure_7/d_priority_exectime/._SUCCESS.crc
Normal file
Binary file not shown.
BIN
figure_7/d_priority_exectime/.part-00000.crc
Normal file
BIN
figure_7/d_priority_exectime/.part-00000.crc
Normal file
Binary file not shown.
0
figure_7/d_priority_exectime/_SUCCESS
Normal file
0
figure_7/d_priority_exectime/_SUCCESS
Normal file
34
figure_7/d_priority_exectime/part-00000
Normal file
34
figure_7/d_priority_exectime/part-00000
Normal file
|
@ -0,0 +1,34 @@
|
|||
5,0,60-1d,331929
|
||||
4,0,2-4,50447
|
||||
6,0,2-4,44265781
|
||||
7,0,60-1d,799490797
|
||||
-1,0,<1,4544421
|
||||
7,0,4-10,519875128
|
||||
5,0,2-4,333758
|
||||
7,0,1-2,106909750
|
||||
5,0,10-60,838209
|
||||
5,0,4-10,737122
|
||||
6,0,>=1d,235790332
|
||||
7,0,10-60,476207710
|
||||
8,0,<1,797658
|
||||
4,0,4-10,298479
|
||||
5,0,1-2,108840
|
||||
6,0,60-1d,21709967
|
||||
5,0,>=1d,29145185
|
||||
5,0,<1,722288
|
||||
4,0,10-60,430109
|
||||
6,0,1-2,55841318
|
||||
4,0,<1,5562
|
||||
4,0,60-1d,3522738
|
||||
6,0,4-10,42848563
|
||||
4,0,>=1d,47657312
|
||||
6,0,<1,51292750
|
||||
4,0,1-2,9021
|
||||
7,0,>=1d,1321688066
|
||||
8,0,60-1d,182
|
||||
7,0,<1,128175243
|
||||
8,0,10-60,560
|
||||
8,0,2-4,34
|
||||
8,0,>=1d,31728
|
||||
7,0,2-4,244210514
|
||||
6,0,10-60,64400417
|
BIN
figure_7/e_priority_exectime/._SUCCESS.crc
Normal file
BIN
figure_7/e_priority_exectime/._SUCCESS.crc
Normal file
Binary file not shown.
BIN
figure_7/e_priority_exectime/.part-00000.crc
Normal file
BIN
figure_7/e_priority_exectime/.part-00000.crc
Normal file
Binary file not shown.
0
figure_7/e_priority_exectime/_SUCCESS
Normal file
0
figure_7/e_priority_exectime/_SUCCESS
Normal file
32
figure_7/e_priority_exectime/part-00000
Normal file
32
figure_7/e_priority_exectime/part-00000
Normal file
|
@ -0,0 +1,32 @@
|
|||
5,0,>=1d,24065006
|
||||
7,0,1-2,16642387
|
||||
6,0,>=1d,5095832
|
||||
7,0,>=1d,1333707353
|
||||
5,0,1-2,51413
|
||||
8,0,>=1d,59
|
||||
4,0,<1,5002
|
||||
6,0,1-2,16051042
|
||||
4,0,>=1d,7758651
|
||||
7,0,2-4,86806397
|
||||
6,0,60-1d,6125245
|
||||
5,0,10-60,313662
|
||||
4,0,2-4,28817
|
||||
6,0,4-10,77888564
|
||||
5,0,<1,68072
|
||||
6,0,10-60,41506399
|
||||
4,0,4-10,347459
|
||||
6,0,2-4,17211883
|
||||
4,0,60-1d,1828664
|
||||
5,0,2-4,208141
|
||||
4,0,10-60,427813
|
||||
7,0,10-60,290159810
|
||||
7,0,<1,34018722
|
||||
6,0,<1,61095340
|
||||
5,0,4-10,69461
|
||||
7,0,4-10,162898256
|
||||
4,0,1-2,4033
|
||||
7,0,60-1d,845942465
|
||||
8,0,<1,38780
|
||||
8,0,60-1d,38
|
||||
-1,0,<1,1457876
|
||||
5,0,60-1d,478983
|
BIN
figure_7/f_priority_exectime/._SUCCESS.crc
Normal file
BIN
figure_7/f_priority_exectime/._SUCCESS.crc
Normal file
Binary file not shown.
BIN
figure_7/f_priority_exectime/.part-00000.crc
Normal file
BIN
figure_7/f_priority_exectime/.part-00000.crc
Normal file
Binary file not shown.
0
figure_7/f_priority_exectime/_SUCCESS
Normal file
0
figure_7/f_priority_exectime/_SUCCESS
Normal file
36
figure_7/f_priority_exectime/part-00000
Normal file
36
figure_7/f_priority_exectime/part-00000
Normal file
|
@ -0,0 +1,36 @@
|
|||
4,0,10-60,143005
|
||||
7,0,<1,27095109
|
||||
5,0,1-2,118612
|
||||
5,0,>=1d,12449046
|
||||
6,0,2-4,12286904
|
||||
8,0,10-60,131690
|
||||
4,0,4-10,49384
|
||||
5,0,2-4,498625
|
||||
7,0,2-4,41209062
|
||||
8,0,2-4,41676
|
||||
7,0,10-60,267957130
|
||||
8,0,>=1d,47829
|
||||
8,0,<1,503296
|
||||
4,0,<1,3387
|
||||
8,0,60-1d,134
|
||||
4,0,>=1d,39737313
|
||||
5,0,<1,171247
|
||||
6,0,>=1d,101626554
|
||||
5,0,4-10,91257
|
||||
8,0,4-10,192276
|
||||
4,0,2-4,15259
|
||||
-1,0,<1,963888
|
||||
6,0,<1,45025409
|
||||
7,0,4-10,104957224
|
||||
4,0,1-2,2802
|
||||
7,0,>=1d,885691405
|
||||
6,0,60-1d,32130223
|
||||
5,0,10-60,1003509
|
||||
7,0,60-1d,642935158
|
||||
7,0,1-2,30708656
|
||||
6,0,10-60,38209602
|
||||
6,0,1-2,14727025
|
||||
8,0,1-2,18432
|
||||
4,0,60-1d,24102473
|
||||
6,0,4-10,16160973
|
||||
5,0,60-1d,139912
|
137
figure_7/figure7ab.py
Executable file
137
figure_7/figure7ab.py
Executable file
|
@ -0,0 +1,137 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding: utf-8
|
||||
|
||||
import json
|
||||
import pandas
|
||||
import findspark
|
||||
findspark.init()
|
||||
import pyspark
|
||||
import pyspark.sql
|
||||
import sys
|
||||
import gzip
|
||||
|
||||
from pyspark import AccumulatorParam
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql import Window
|
||||
from pyspark.sql.types import ByteType
|
||||
|
||||
if len(sys.argv) is not 4:
|
||||
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
||||
sys.exit()
|
||||
|
||||
cluster=sys.argv[1]
|
||||
|
||||
spark = pyspark.sql.SparkSession.builder \
|
||||
.appName("task_slowdown") \
|
||||
.config("spark.driver.maxResultSize", "128g") \
|
||||
.config("spark.local.dir", sys.argv[2]) \
|
||||
.config("spark.driver.memory", sys.argv[3]) \
|
||||
.getOrCreate()
|
||||
sc = spark.sparkContext
|
||||
|
||||
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
|
||||
#df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
|
||||
|
||||
try:
|
||||
df["collection_type"] = df["collection_type"].cast(ByteType())
|
||||
except:
|
||||
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
|
||||
|
||||
|
||||
class NonPriorityAcc(pyspark.AccumulatorParam):
|
||||
|
||||
def zero(self, value):
|
||||
return {}
|
||||
|
||||
def addInPlace(self, v1, v2):
|
||||
for key in v2:
|
||||
if key in v1:
|
||||
v1[key] += v2[key]
|
||||
else:
|
||||
v1[key] = v2[key]
|
||||
return v1
|
||||
|
||||
non = sc.accumulator({}, NonPriorityAcc())
|
||||
|
||||
MICROS = 1000000
|
||||
|
||||
def sumrow(l, p, t, c):
|
||||
t = t // (MICROS * 60)
|
||||
if t < 1:
|
||||
t = "<1"
|
||||
elif t < 2:
|
||||
t = "1-2"
|
||||
elif t < 4:
|
||||
t = "2-4"
|
||||
elif t < 10:
|
||||
t = "4-10"
|
||||
elif t < 60:
|
||||
t = "10-60"
|
||||
elif t < 60 * 24:
|
||||
t = "60-1d"
|
||||
else:
|
||||
t = ">=1d"
|
||||
return (l, p, t, c)
|
||||
|
||||
|
||||
def sumid(sr):
|
||||
return (sr[0], sr[1], sr[2])
|
||||
|
||||
|
||||
def for_each_task(ts):
|
||||
global non
|
||||
|
||||
ts = sorted(ts, key=lambda x: x["time"])
|
||||
|
||||
in_exec = False
|
||||
exec_start = None
|
||||
exec_tot = 0
|
||||
priority = 0
|
||||
l = len(ts)
|
||||
last_term = -1
|
||||
|
||||
for i,t in enumerate(ts):
|
||||
if t["priority"] is not -1 and priority is -1:
|
||||
priority = t["priority"]
|
||||
if t["type"] >= 4 and t["type"] <= 8:
|
||||
last_term = t["type"]
|
||||
if in_exec and (t["type"] == 1 or (t["type"] >= 4 and t["type"] <= 8)):
|
||||
exec_tot += t["time"] - exec_start
|
||||
in_exec = False
|
||||
if (not in_exec) and (t["type"] == 3):
|
||||
exec_start = t["time"]
|
||||
in_exec = True
|
||||
|
||||
return sumrow(last_term, priority, exec_tot, l)
|
||||
|
||||
|
||||
def cleanup(x):
|
||||
return {
|
||||
"time": int(x.time),
|
||||
"type": 0 if x.type is None else int(x.type),
|
||||
"id": x.collection_id + "-" + x.instance_index,
|
||||
"priority": -1 if x.priority is None else int(x.priority)
|
||||
}
|
||||
|
||||
|
||||
def sum_rows(xs):
|
||||
csum = 0
|
||||
for x in xs:
|
||||
csum += x[3]
|
||||
return csum
|
||||
|
||||
|
||||
df2 = df.rdd \
|
||||
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
|
||||
.filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \
|
||||
.map(cleanup) \
|
||||
.groupBy(lambda x: x["id"]) \
|
||||
.mapValues(for_each_task) \
|
||||
.map(lambda x: x[1]) \
|
||||
.groupBy(lambda x: sumid(x)) \
|
||||
.mapValues(sum_rows) \
|
||||
.map(lambda x: str(x[0][0]) + "," + str(x[0][1]) + "," + str(x[0][2]) + "," + str(x[1])) \
|
||||
.coalesce(1) \
|
||||
.saveAsTextFile(cluster + "_priority_exectime")
|
||||
|
||||
# vim: set ts=4 sw=4 et tw=80:
|
BIN
figure_7/g_priority_exectime/._SUCCESS.crc
Normal file
BIN
figure_7/g_priority_exectime/._SUCCESS.crc
Normal file
Binary file not shown.
BIN
figure_7/g_priority_exectime/.part-00000.crc
Normal file
BIN
figure_7/g_priority_exectime/.part-00000.crc
Normal file
Binary file not shown.
0
figure_7/g_priority_exectime/_SUCCESS
Normal file
0
figure_7/g_priority_exectime/_SUCCESS
Normal file
32
figure_7/g_priority_exectime/part-00000
Normal file
32
figure_7/g_priority_exectime/part-00000
Normal file
|
@ -0,0 +1,32 @@
|
|||
6,0,<1,5855111
|
||||
7,0,60-1d,601542196
|
||||
7,0,10-60,95838227
|
||||
6,0,>=1d,530133143
|
||||
4,0,>=1d,36971224
|
||||
5,0,60-1d,1010045
|
||||
8,0,<1,3811
|
||||
8,0,>=1d,263540
|
||||
5,0,2-4,22902724
|
||||
5,0,4-10,6661506
|
||||
7,0,<1,12043755
|
||||
6,0,60-1d,10739044
|
||||
7,0,2-4,16985420
|
||||
4,0,1-2,2714
|
||||
7,0,1-2,6803910
|
||||
5,0,>=1d,4610298
|
||||
4,0,<1,1840
|
||||
7,0,4-10,36246298
|
||||
6,0,10-60,19503280
|
||||
7,0,>=1d,1599925741
|
||||
6,0,1-2,8607061
|
||||
5,0,<1,275553
|
||||
4,0,60-1d,6441519
|
||||
8,0,60-1d,1244
|
||||
4,0,2-4,8100
|
||||
4,0,10-60,141727
|
||||
5,0,10-60,2046864
|
||||
6,0,4-10,9704610
|
||||
5,0,1-2,23954
|
||||
6,0,2-4,10143359
|
||||
4,0,4-10,35493
|
||||
-1,0,<1,397201
|
BIN
figure_7/h_priority_exectime/._SUCCESS.crc
Normal file
BIN
figure_7/h_priority_exectime/._SUCCESS.crc
Normal file
Binary file not shown.
BIN
figure_7/h_priority_exectime/.part-00000.crc
Normal file
BIN
figure_7/h_priority_exectime/.part-00000.crc
Normal file
Binary file not shown.
0
figure_7/h_priority_exectime/_SUCCESS
Normal file
0
figure_7/h_priority_exectime/_SUCCESS
Normal file
32
figure_7/h_priority_exectime/part-00000
Normal file
32
figure_7/h_priority_exectime/part-00000
Normal file
|
@ -0,0 +1,32 @@
|
|||
8,0,>=1d,5054
|
||||
-1,0,<1,954975
|
||||
7,0,<1,24484463
|
||||
6,0,60-1d,23056333
|
||||
4,0,2-4,1836
|
||||
4,0,4-10,8989
|
||||
8,0,<1,124
|
||||
7,0,2-4,23796068
|
||||
7,0,>=1d,1239402690
|
||||
5,0,4-10,1489254
|
||||
6,0,>=1d,78233744
|
||||
7,0,4-10,62455654
|
||||
6,0,4-10,11418060
|
||||
6,0,2-4,11039358
|
||||
4,0,10-60,40994
|
||||
7,0,60-1d,722926038
|
||||
5,0,>=1d,8729456
|
||||
7,0,1-2,4366627
|
||||
5,0,<1,189088
|
||||
5,0,60-1d,1497194
|
||||
8,0,60-1d,1922
|
||||
5,0,2-4,16174
|
||||
6,0,1-2,4843246
|
||||
4,0,<1,779
|
||||
6,0,10-60,19472894
|
||||
4,0,1-2,791
|
||||
7,0,10-60,138882516
|
||||
4,0,60-1d,953635
|
||||
4,0,>=1d,4032484
|
||||
6,0,<1,13791195
|
||||
5,0,1-2,3721
|
||||
5,0,10-60,325669
|
148
figure_8/figure8-join.py
Executable file
148
figure_8/figure8-join.py
Executable file
|
@ -0,0 +1,148 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding: utf-8
|
||||
|
||||
import os
|
||||
import json
|
||||
import pandas as pd
|
||||
import findspark
|
||||
findspark.init()
|
||||
import pyspark
|
||||
import pyspark.sql
|
||||
import sys
|
||||
import gzip
|
||||
from pyspark import AccumulatorParam
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql import Window
|
||||
from pyspark.sql.types import *
|
||||
from decimal import *
|
||||
|
||||
TESTDATA = False
|
||||
|
||||
if len(sys.argv) is not 4:
|
||||
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
||||
sys.exit()
|
||||
|
||||
cluster=sys.argv[1]
|
||||
|
||||
spark = pyspark.sql.SparkSession.builder \
|
||||
.appName("task_slowdown") \
|
||||
.config("spark.driver.maxResultSize", "128g") \
|
||||
.config("spark.local.dir", sys.argv[2]) \
|
||||
.config("spark.driver.memory", sys.argv[3]) \
|
||||
.getOrCreate()
|
||||
sc = spark.sparkContext
|
||||
|
||||
#
|
||||
# READING INSTANCE EVENTS DATA
|
||||
#
|
||||
dfepath = "/home/claudio/google_2019/instance_events/" + cluster
|
||||
dfepath += "/" + cluster + ("_instance_events00000000000?.json.gz" if TESTDATA else "_instance_events*.json.gz")
|
||||
dfe = spark.read.json(dfepath)
|
||||
#
|
||||
# READING INSTANCE USAGE DATA
|
||||
#
|
||||
dfupath = "/home/claudio/google_2019/instance_usage/" + cluster
|
||||
dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.csv.gz")
|
||||
usage_schema = StructType() \
|
||||
.add("start_time", LongType(), True) \
|
||||
.add("end_time", LongType(), True) \
|
||||
.add("collection_id", StringType(), True) \
|
||||
.add("instance_index", StringType(), True) \
|
||||
.add("machine_id", StringType(), True) \
|
||||
.add("alloc_collection_id", LongType(), True) \
|
||||
.add("alloc_instance_index", StringType(), True) \
|
||||
.add("collection_type", ByteType(), True) \
|
||||
.add("average_usage_cpus", DoubleType(), True) \
|
||||
.add("average_usage_memory", DoubleType(), True) \
|
||||
.add("maximum_usage_cpus", DoubleType(), True) \
|
||||
.add("maximum_usage_memory", DoubleType(), True) \
|
||||
.add("random_sample_usage_cpus", DoubleType(), True) \
|
||||
.add("random_sample_usage_memory", DoubleType(), True) \
|
||||
.add("assigned_memory", DoubleType(), True) \
|
||||
.add("page_cache_memory", DoubleType(), True) \
|
||||
.add("cycles_per_instruction", DoubleType(), True) \
|
||||
.add("memory_accLesses_per_instruction", DoubleType(), True) \
|
||||
.add("sample_rate", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_00", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_10", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_20", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_30", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_40", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_50", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_60", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_70", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_80", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_90", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_91", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_92", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_93", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_94", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_95", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_96", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_97", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_98", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_99", DoubleType(), True)
|
||||
|
||||
dfu = spark.read.format("csv") \
|
||||
.option("header", False) \
|
||||
.schema(usage_schema) \
|
||||
.load(dfupath)
|
||||
|
||||
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
||||
print("Starting to read machine events...")
|
||||
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv")
|
||||
print("Dropping remove events...")
|
||||
dfm = dfm[(dfm.type==1)|(dfm.type==3)]
|
||||
print("Dropping missing data events...")
|
||||
dfm = dfm[dfm.missing_data_reason.notnull()]
|
||||
print("Projecting on useful columns...")
|
||||
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
|
||||
print("Sorting by time...")
|
||||
dfm = dfm.sort_values(by=["machine_id", "time"])
|
||||
print("Converting to broadcast variable...")
|
||||
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
|
||||
print("Done with machine events.")
|
||||
|
||||
def tabid(x):
|
||||
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
|
||||
|
||||
# interpolate machine data by extending each last machine report before a gap to cover it
|
||||
def clean_usage(x):
|
||||
return [tabid(x), Decimal(x.average_usage_cpus), Decimal(x.average_usage_memory), int(x.start_time),
|
||||
int(x.end_time), x.machine_id]
|
||||
|
||||
def interpolate_usage(ts):
|
||||
ts = sorted(ts, key=lambda x: x[3])
|
||||
l = len(ts)
|
||||
for i in range(1, l-1):
|
||||
if ts[i+1][3] > ts[i][4]:
|
||||
ts[i][4] = ts[i+1][3]
|
||||
return ts
|
||||
|
||||
dfu = dfu.rdd \
|
||||
.filter(lambda x: x.start_time is not None and x.end_time is not None and
|
||||
x.instance_index is not None and x.collection_id is not None and
|
||||
x.machine_id is not None) \
|
||||
.map(clean_usage).groupBy(lambda x: x[0]) \
|
||||
.flatMap(lambda x: interpolate_usage(x[1])).toDF(["id", "acpu", "aram", "start", "end", "mid"])
|
||||
|
||||
try:
|
||||
dfe["collection_type"] = dfe["collection_type"].cast(ByteType())
|
||||
except:
|
||||
dfe = dfe.withColumn("collection_type", lit(None).cast(ByteType()))
|
||||
|
||||
dfe = dfe.rdd \
|
||||
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
|
||||
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
|
||||
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
|
||||
.map(lambda x: [tabid(x), int(x.time), int(x.type),
|
||||
Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \
|
||||
.toDF(["id", "time", "type", "rcpu", "rram", "mid"])
|
||||
|
||||
df = dfe.alias("dfe").join(dfu.alias("dfu"), [dfe.id == dfu.id, dfe.mid == dfu.mid, dfe.time > dfu.start, dfe.time <= dfu.end]) \
|
||||
.selectExpr("dfe.id as id", "dfe.mid as mid", "dfe.time as time", "dfe.type as type", "dfe.rcpu as rcpu",
|
||||
"dfe.rram as rram", "dfu.acpu as acpu", "dfu.aram as aram")
|
||||
|
||||
df.write.parquet("/home/claudio/raid0/figure-8-join-" + cluster + ".parquet")
|
||||
|
||||
# vim: set ts=4 sw=4 et tw=120:
|
195
figure_8/figure8-measure.py
Executable file
195
figure_8/figure8-measure.py
Executable file
|
@ -0,0 +1,195 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding: utf-8
|
||||
|
||||
import os
|
||||
import json
|
||||
import pandas as pd
|
||||
import findspark
|
||||
findspark.init()
|
||||
import pyspark
|
||||
import pyspark.sql
|
||||
import sys
|
||||
import gzip
|
||||
from pyspark import AccumulatorParam
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql import Window
|
||||
from pyspark.sql.types import *
|
||||
from decimal import *
|
||||
|
||||
if len(sys.argv) is not 5:
|
||||
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {joindir}")
|
||||
sys.exit()
|
||||
|
||||
joindir=sys.argv[4]
|
||||
|
||||
cluster=sys.argv[1]
|
||||
|
||||
spark = pyspark.sql.SparkSession.builder \
|
||||
.appName("task_slowdown") \
|
||||
.config("spark.driver.maxResultSize", sys.argv[3]) \
|
||||
.config("spark.local.dir", sys.argv[2]) \
|
||||
.config("spark.driver.memory", sys.argv[3]) \
|
||||
.getOrCreate()
|
||||
sc = spark.sparkContext
|
||||
|
||||
df = spark.read.parquet(joindir + "/figure-8-join-" + cluster + ".parquet")
|
||||
|
||||
### TESTING ONLY
|
||||
#df = df.limit(10000)
|
||||
|
||||
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
||||
print("Starting to read machine events...")
|
||||
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
|
||||
'time': lambda x: -1 if x == '' else int(x),
|
||||
'machine_id': lambda x: str(x),
|
||||
'capacity.cpus': lambda x: -1 if x == '' else Decimal(x),
|
||||
'capacity.memory': lambda x: -1 if x == '' else Decimal(x)})
|
||||
print("Dropping remove events...")
|
||||
dfm = dfm[(dfm.type!=2)&(dfm.time!=-1)&(dfm["capacity.cpus"]!=-1)&(dfm["capacity.memory"]!=-1)]
|
||||
print("Dropping missing data events...")
|
||||
dfm = dfm[dfm.missing_data_reason.isnull()]
|
||||
print("Projecting on useful columns...")
|
||||
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
|
||||
print("Sorting by time...")
|
||||
dfm = dfm.sort_values(by=["machine_id", "time"])
|
||||
print("Converting to broadcast variable...")
|
||||
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
|
||||
print("Done with machine events.")
|
||||
|
||||
def get_machine_time_resources(machine_id):
|
||||
def aux(i, j):
|
||||
mid = (i + j) // 2
|
||||
if dfm.value[mid][1] > machine_id:
|
||||
if mid - 1 < 1 or i == j:
|
||||
return None
|
||||
return aux(i, mid - 1)
|
||||
elif dfm.value[mid][1] < machine_id:
|
||||
if mid + 1 > j or i == j:
|
||||
return None
|
||||
return aux(mid + 1, j)
|
||||
else:
|
||||
start = mid
|
||||
while dfm.value[start][1] == machine_id and start >= 0: # once found, search for oldest log for machine
|
||||
start -= 1
|
||||
start += 1
|
||||
end = mid
|
||||
while end < len(dfm.value) and dfm.value[end][1] == machine_id:
|
||||
end += 1
|
||||
return dfm.value[start:end]
|
||||
return aux(0, len(dfm.value)-1)
|
||||
|
||||
def increment_reserv_bucket(bucket, ceils, taskid, reserv, last_term_by_id):
|
||||
idx = 0
|
||||
while idx < len(ceils) and ceils[idx] < reserv:
|
||||
idx += 1
|
||||
|
||||
if taskid not in bucket:
|
||||
bucket[taskid] = [0] * (len(ceils) + 1)
|
||||
bucket[taskid][idx] += 1
|
||||
|
||||
def bucket_sum_per_termination(bucket, last_term_by_id):
|
||||
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||
for tid, vs in bucket.items():
|
||||
term = last_term_by_id[tid]
|
||||
if result[term] is None:
|
||||
result[term] = vs
|
||||
else:
|
||||
result[term] = [sum(x) for x in zip(result[term], vs)]
|
||||
return result
|
||||
|
||||
def for_each_joined(x):
|
||||
machine_id = x[0]
|
||||
ts = x[1]
|
||||
|
||||
ts = sorted(ts, key=lambda x: x.time)
|
||||
last_req_by_id = {} # map taskid -> last known req [cpu, ram] (data removed when task terminates)
|
||||
|
||||
reserv_ceils = [0, 0.2, 0.4, 0.6, 0.8, 1]
|
||||
cpu_reservs_by_id = {} # map taskid -> [n, a, b, c, d, e, f] where:
|
||||
# n: count of event with unknown machine config
|
||||
# a: count of event with res. reserv. <0.2
|
||||
# b: count of event with res. reserv. [0.2, 0.4)
|
||||
# c: count of event with res. reserv. [0.4, 0.6)
|
||||
# d: count of event with res. reserv. [0.6, 0.8)
|
||||
# e: count of event with res. reserv. [0.8, 0.1)
|
||||
# f: count of event with res. reserv. >=1
|
||||
ram_reservs_by_id = {}
|
||||
|
||||
request_ceils = [0.025, 0.05, 0.075]
|
||||
cpu_request_by_id = {} # map taskid -> [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075
|
||||
ram_request_by_id = {}
|
||||
|
||||
util_ceils = reserv_ceils
|
||||
cpu_util_by_id = {}
|
||||
ram_util_by_id = {}
|
||||
|
||||
last_term_by_id = {} # map taskid -> last termination
|
||||
machine_logs = get_machine_time_resources(machine_id)
|
||||
|
||||
for i, t in enumerate(ts):
|
||||
if machine_logs is not None and len(machine_logs) > 1 and machine_logs[1][0] >= t.time:
|
||||
machine_logs.pop(0)
|
||||
if t.id not in last_term_by_id:
|
||||
last_term_by_id[t.id] = -1
|
||||
if t.rcpu is not None and t.rram is not None:
|
||||
last_req_by_id[t.id] = (t.rcpu, t.rram)
|
||||
# 8b
|
||||
tot_req = [sum(x) for x in zip(*last_req_by_id.values())]
|
||||
if machine_logs is not None:
|
||||
reserv_cpu = tot_req[0] / machine_logs[0][2]
|
||||
reserv_ram = tot_req[1] / machine_logs[0][3]
|
||||
else:
|
||||
reserv_cpu = -1
|
||||
reserv_ram = -1
|
||||
increment_reserv_bucket(cpu_reservs_by_id, reserv_ceils, t.id, reserv_cpu, last_term_by_id)
|
||||
increment_reserv_bucket(ram_reservs_by_id, reserv_ceils, t.id, reserv_ram, last_term_by_id)
|
||||
# 8a
|
||||
increment_reserv_bucket(cpu_request_by_id, request_ceils, t.id, t.rcpu, last_term_by_id)
|
||||
increment_reserv_bucket(ram_request_by_id, request_ceils, t.id, t.rram, last_term_by_id)
|
||||
# 8c
|
||||
increment_reserv_bucket(cpu_util_by_id, util_ceils, t.id, t.acpu, last_term_by_id)
|
||||
increment_reserv_bucket(ram_util_by_id, util_ceils, t.id, t.aram, last_term_by_id)
|
||||
if t.type >= 4 and t.type <= 8:
|
||||
last_term_by_id[t.id] = t.type
|
||||
|
||||
resobj = {'rcpu': cpu_request_by_id, 'rram': ram_request_by_id, 'rscpu': cpu_reservs_by_id,
|
||||
'rsram': ram_reservs_by_id, 'ucpu': cpu_util_by_id, 'uram': ram_util_by_id}
|
||||
|
||||
for k, v in resobj.items():
|
||||
resobj[k] = bucket_sum_per_termination(v, last_term_by_id)
|
||||
|
||||
return resobj
|
||||
|
||||
def fold_resobjs(ro1, ro2):
|
||||
if ro1 is None:
|
||||
return ro2
|
||||
elif ro2 is None:
|
||||
return ro1
|
||||
else:
|
||||
for k in ro1.keys():
|
||||
for kk in ro1[k].keys():
|
||||
if ro1[k][kk] is None:
|
||||
ro1[k][kk] = ro2[k][kk]
|
||||
elif ro2[k][kk] is None:
|
||||
continue
|
||||
else:
|
||||
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
|
||||
return ro1
|
||||
|
||||
# TODO: partition by id and in the for-each-row
|
||||
# function implement lookup to dfm.value to understand its memory capacity
|
||||
|
||||
import random
|
||||
|
||||
result = df.rdd \
|
||||
.groupBy(lambda x: x.mid) \
|
||||
.partitionBy(2000, lambda x: random.randint(0, 2000-1)) \
|
||||
.map(for_each_joined) \
|
||||
.fold(None, fold_resobjs)
|
||||
|
||||
d = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
with open(d + "/" + cluster + "_figure8.json", "w") as f:
|
||||
json.dump(result, f)
|
||||
|
||||
# vim: set ts=4 sw=4 et tw=120:
|
275
figure_8/figure8.py
Executable file
275
figure_8/figure8.py
Executable file
|
@ -0,0 +1,275 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding: utf-8
|
||||
|
||||
import os
|
||||
import json
|
||||
import pandas as pd
|
||||
import findspark
|
||||
findspark.init()
|
||||
import pyspark
|
||||
import pyspark.sql
|
||||
import sys
|
||||
import gzip
|
||||
from pyspark import AccumulatorParam
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql import Window
|
||||
from pyspark.sql.types import *
|
||||
from decimal import *
|
||||
|
||||
TESTDATA = True
|
||||
|
||||
if len(sys.argv) is not 4:
|
||||
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
||||
sys.exit()
|
||||
|
||||
cluster=sys.argv[1]
|
||||
|
||||
spark = pyspark.sql.SparkSession.builder \
|
||||
.appName("task_slowdown") \
|
||||
.config("spark.driver.maxResultSize", "128g") \
|
||||
.config("spark.local.dir", sys.argv[2]) \
|
||||
.config("spark.driver.memory", sys.argv[3]) \
|
||||
.getOrCreate()
|
||||
sc = spark.sparkContext
|
||||
|
||||
#
|
||||
# READING INSTANCE EVENTS DATA
|
||||
#
|
||||
dfepath = "/home/claudio/google_2019/instance_events/" + cluster
|
||||
dfepath += "/" + cluster + ("_instance_events00000000000?.json.gz" if TESTDATA else "_instance_events*.json.gz")
|
||||
dfe = spark.read.json(dfepath)
|
||||
try:
|
||||
dfe["collection_type"] = dfe["collection_type"].cast(ByteType())
|
||||
except:
|
||||
dfe = dfe.withColumn("collection_type", lit(None).cast(ByteType()))
|
||||
|
||||
#
|
||||
# READING INSTANCE USAGE DATA
|
||||
#
|
||||
dfupath = "/home/claudio/google_2019/instance_usage/" + cluster
|
||||
dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.csv.gz")
|
||||
usage_schema = StructType() \
|
||||
.add("start_time", LongType(), True) \
|
||||
.add("end_time", LongType(), True) \
|
||||
.add("collection_id", StringType(), True) \
|
||||
.add("instance_index", StringType(), True) \
|
||||
.add("machine_id", StringType(), True) \
|
||||
.add("alloc_collection_id", LongType(), True) \
|
||||
.add("alloc_instance_index", StringType(), True) \
|
||||
.add("collection_type", ByteType(), True) \
|
||||
.add("average_usage_cpus", DoubleType(), True) \
|
||||
.add("average_usage_memory", DoubleType(), True) \
|
||||
.add("maximum_usage_cpus", DoubleType(), True) \
|
||||
.add("maximum_usage_memory", DoubleType(), True) \
|
||||
.add("random_sample_usage_cpus", DoubleType(), True) \
|
||||
.add("random_sample_usage_memory", DoubleType(), True) \
|
||||
.add("assigned_memory", DoubleType(), True) \
|
||||
.add("page_cache_memory", DoubleType(), True) \
|
||||
.add("cycles_per_instruction", DoubleType(), True) \
|
||||
.add("memory_accLesses_per_instruction", DoubleType(), True) \
|
||||
.add("sample_rate", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_00", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_10", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_20", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_30", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_40", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_50", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_60", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_70", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_80", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_90", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_91", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_92", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_93", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_94", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_95", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_96", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_97", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_98", DoubleType(), True) \
|
||||
.add("cpu_usage_dist_99", DoubleType(), True)
|
||||
|
||||
dfu = spark.read.format("csv") \
|
||||
.option("header", False) \
|
||||
.schema(usage_schema) \
|
||||
.load(dfupath)
|
||||
|
||||
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
||||
print("Starting to read machine events...")
|
||||
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv")
|
||||
print("Dropping remove events...")
|
||||
dfm = dfm[(dfm.type==1)|(dfm.type==3)]
|
||||
print("Dropping missing data events...")
|
||||
dfm = dfm[dfm.missing_data_reason.notnull()]
|
||||
print("Projecting on useful columns...")
|
||||
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
|
||||
print("Sorting by time...")
|
||||
dfm = dfm.sort_values(by=["machine_id", "time"])
|
||||
print("Converting to broadcast variable...")
|
||||
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
|
||||
print("Done with machine events.")
|
||||
|
||||
def tabid(x):
|
||||
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
|
||||
|
||||
# interpolate machine data by extending each last machine report before a gap to cover it
|
||||
def clean_usage(x):
|
||||
return [tabid(x), Decimal(x.average_usage_cpus), Decimal(x.average_usage_memory), int(x.start_time),
|
||||
int(x.end_time), x.machine_id]
|
||||
|
||||
def interpolate_usage(ts):
|
||||
ts = sorted(ts, key=lambda x: x[3])
|
||||
l = len(ts)
|
||||
for i in range(1, l-1):
|
||||
if ts[i+1][3] > ts[i][4]:
|
||||
ts[i][4] = ts[i+1][3]
|
||||
return ts
|
||||
|
||||
dfu = dfu.rdd \
|
||||
.filter(lambda x: x.start_time is not None and x.end_time is not None and
|
||||
x.instance_index is not None and x.collection_id is not None and
|
||||
x.machine_id is not None) \
|
||||
.map(clean_usage).groupBy(lambda x: x[0]) \
|
||||
.flatMap(lambda x: interpolate_usage(x[1])).toDF(["id", "acpu", "aram", "start", "end", "mid"])
|
||||
|
||||
dfe = dfe.rdd \
|
||||
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
|
||||
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
|
||||
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
|
||||
.map(lambda x: [tabid(x), int(x.time), int(x.type),
|
||||
Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \
|
||||
.toDF(["id", "time", "type", "rcpu", "rram", "mid"])
|
||||
|
||||
df = dfe.join(dfu, [dfe.id == dfu.id, dfe.mid == dfu.mid, dfe.time >= dfu.start, dfe.time < dfu.end])
|
||||
|
||||
def get_machine_time_resources(machine_id):
|
||||
def aux(i, j):
|
||||
mid = (i + j) // 2
|
||||
print(i, j, mid)
|
||||
print(dfm.value[mid])
|
||||
if dfm.value[mid][1] > machine_id:
|
||||
return aux(i, mid - 1)
|
||||
elif dfm.value[mid][1] < machine_id:
|
||||
return aux(mid + 1, j)
|
||||
else:
|
||||
start = mid
|
||||
while dfm.value[start][1] == machine_id and start >= 0: # once found, search for oldest log for machine
|
||||
start -= 1
|
||||
start += 1
|
||||
end = mid
|
||||
while dfm.value[end][1] == machine_id and end < len(dfm.value):
|
||||
end += 1
|
||||
return dfm.value[start:end]
|
||||
return aux(0, len(dfm.value)-1)
|
||||
|
||||
def increment_reserv_bucket(bucket, ceils, taskid, reserv):
|
||||
idx = 0
|
||||
while ceils[idx] < reserv and idx < len(ceils):
|
||||
idx += 1
|
||||
|
||||
if taskid not in bucket:
|
||||
bucket[taskid] = [0] * (len(ceils) + 1)
|
||||
bucket[taskid][idx] += 1
|
||||
|
||||
def bucket_sum_per_termination(bucket, last_term_by_id):
|
||||
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||
for tid, vs in bucket.items():
|
||||
term = last_term_by_id[tid]
|
||||
if result[term] is None:
|
||||
result[term] = vs
|
||||
else:
|
||||
result[term] = [sum(x) for x in zip(result[term], vs)]
|
||||
return result
|
||||
|
||||
def for_each_joined(x):
|
||||
machine_id = x[0]
|
||||
ts = x[1]
|
||||
|
||||
ts = sorted(ts, key=lambda x: x.time)
|
||||
last_req_by_id = {} # map taskid -> last known req [cpu, ram] (data removed when task terminates)
|
||||
|
||||
reserv_ceils = [0.2, 0.4, 0.6, 0.8, 1]
|
||||
cpu_reservs_by_id = {} # map taskid -> [a, b, c, d, e, f] where:
|
||||
# a: count of event with res. reserv. <0.2
|
||||
# b: count of event with res. reserv. [0.2, 0.4)
|
||||
# c: count of event with res. reserv. [0.4, 0.6)
|
||||
# d: count of event with res. reserv. [0.6, 0.8)
|
||||
# e: count of event with res. reserv. [0.8, 0.1)
|
||||
# f: count of event with res. reserv. >=1
|
||||
ram_reservs_by_id = {}
|
||||
|
||||
request_ceils = [0.025, 0.05, 0.075]
|
||||
cpu_request_by_id = {} # map taskid -> [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075
|
||||
ram_request_by_id = {}
|
||||
|
||||
util_ceils = reserv_ceils
|
||||
cpu_util_by_id = {}
|
||||
ram_util_by_id = {}
|
||||
|
||||
last_term_by_id = {} # map taskid -> last termination
|
||||
machine_logs = get_machine_time_resources(machine_id)
|
||||
|
||||
for i, t in enumerate(ts):
|
||||
if len(machine_logs) > 1 and machine_logs[1][0] >= t.time:
|
||||
machine_logs.pop(0)
|
||||
if t.id not in last_term_by_id:
|
||||
last_term_by_id[t.id] = -1
|
||||
if t.type >= 4 and t.type <= 8:
|
||||
last_term_by_id[t.id] = t.type
|
||||
del last_req_by_id[t.id]
|
||||
else:
|
||||
if t.rcpu is not None and t.rram is not None:
|
||||
last_req_by_id[t.id] = (t.rcpu, t.rram)
|
||||
|
||||
# 8b
|
||||
tot_req = [sum(x) for x in zip(*last_req_by_id.values())]
|
||||
reserv_cpu = tot_req[0] / machine_logs[0][2]
|
||||
reserv_ram = tot_req[1] / machine_logs[0][3]
|
||||
increment_reserv_bucket(cpu_reservs_by_id, reserv_ceils, t.id, reserv_cpu)
|
||||
increment_reserv_bucket(ram_reservs_by_id, reserv_ceils, t.id, reserv_ram)
|
||||
|
||||
# 8a
|
||||
increment_reserv_bucket(cpu_request_by_id, request_ceils, t.id, t.rcpu)
|
||||
increment_reserv_bucket(ram_request_by_id, request_ceils, t.id, t.rram)
|
||||
|
||||
# 8c
|
||||
increment_reserv_bucket(cpu_util_by_id, util_ceils, t.id, t.acpu)
|
||||
increment_reserv_bucket(ram_util_by_id, util_ceils, t.id, t.aram)
|
||||
|
||||
resobj = {'rcpu': cpu_request_by_id, 'rram': ram_request_by_id, 'rscpu': cpu_reservs_by_id,
|
||||
'rsram': ram_reservs_by_id, 'ucpu': cpu_util_by_id, 'uram': ram_util_by_id}
|
||||
|
||||
for k, v in resobj.items():
|
||||
resobj[k] = bucket_sum_per_termination(v, last_term_by_id)
|
||||
|
||||
return resobj
|
||||
|
||||
def fold_resobjs(ro1, ro2):
|
||||
if ro1 is None:
|
||||
return ro2
|
||||
elif ro2 is None:
|
||||
return ro1
|
||||
else:
|
||||
for k in ro1.keys():
|
||||
for kk in ro1[k].keys():
|
||||
if ro1[k][kk] is None:
|
||||
ro1[k][kk] = ro2[k][kk]
|
||||
elif ro2[k][kk] is None:
|
||||
continue
|
||||
else:
|
||||
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
|
||||
return ro1
|
||||
|
||||
# TODO: partition by id and in the for-each-row
|
||||
# function implement lookup to dfm.value to understand its memory capacity
|
||||
|
||||
result = df.rdd \
|
||||
.groupBy(lambda x: x.mid) \
|
||||
.map(for_each_joined) \
|
||||
.fold(None, fold_resobjs)
|
||||
|
||||
d = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
with open(d + "/" + cluster + "_figure8.json", "w") as f:
|
||||
json.dump(result, f)
|
||||
|
||||
# vim: set ts=4 sw=4 et tw=120:
|
BIN
machine_configs/machine_configs.pdf
Normal file
BIN
machine_configs/machine_configs.pdf
Normal file
Binary file not shown.
BIN
machine_time_waste/statuses_total_time.pdf
Normal file
BIN
machine_time_waste/statuses_total_time.pdf
Normal file
Binary file not shown.
1
spatial_resource_waste/CORRUPTION.txt
Normal file
1
spatial_resource_waste/CORRUPTION.txt
Normal file
|
@ -0,0 +1 @@
|
|||
lost c_res_actual_per_job parts 01816-01824
|
BIN
spatial_resource_waste/a_actual/._SUCCESS.crc
Normal file
BIN
spatial_resource_waste/a_actual/._SUCCESS.crc
Normal file
Binary file not shown.
Binary file not shown.
0
spatial_resource_waste/a_actual/_SUCCESS
Normal file
0
spatial_resource_waste/a_actual/_SUCCESS
Normal file
|
@ -0,0 +1,6 @@
|
|||
-1,73165704815830.230712890625000000,73345651566476.821899414062500000
|
||||
6,146534454172963.142395019531250000,77692052834547.042846679687500000
|
||||
5,239202281025317.192077636718750000,199303134846755.981445312500000000
|
||||
4,1410420680006233.215332031250000000,829767989678030.014038085937500000
|
||||
8,957825860821.723937988281250000,635555632653.236389160156250000
|
||||
7,8624573251690182.685852050781250000,5840241764609245.300292968750000000
|
|
BIN
spatial_resource_waste/a_res_micros_actual_per_job/._SUCCESS.crc
Normal file
BIN
spatial_resource_waste/a_res_micros_actual_per_job/._SUCCESS.crc
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show more
Loading…
Reference in a new issue