This commit is contained in:
Claudio Maggioni 2021-04-12 14:12:44 +00:00
parent 3599731bee
commit 6128ec51e1
18891 changed files with 1711 additions and 1 deletions

2
.gitignore vendored
View File

@ -1 +1,3 @@
**/.ipynb_checkpoints/
task_slowdown/?_state_changes.json.gz
**/*.gz

Binary file not shown.

Binary file not shown.

View File

View File

@ -0,0 +1,32 @@
6,0,<1,15931250
6,0,10-60,43594153
4,0,<1,4832
4,0,1-2,10239
6,0,4-10,37023938
5,0,2-4,153744
7,0,>=1d,585952420
7,0,4-10,44599121
5,0,4-10,94793
4,0,10-60,871433
6,0,>=1d,214183666
8,0,<1,581529
7,0,<1,47439916
-1,0,<1,2976745
5,0,1-2,2898784
4,0,4-10,174080
5,0,10-60,2667394
5,0,60-1d,1042564
6,0,1-2,52132565
8,0,60-1d,29
7,0,1-2,32273543
4,0,>=1d,74469275
6,0,60-1d,12314134
8,0,>=1d,414312
4,0,60-1d,6059050
5,0,<1,525809
5,0,>=1d,14986195
4,0,2-4,35855
7,0,2-4,20437332
7,0,10-60,79247427
6,0,2-4,39184016
7,0,60-1d,309129009

Binary file not shown.

Binary file not shown.

View File

View File

@ -0,0 +1,34 @@
6,0,4-10,28788042
4,0,1-2,27192
5,0,4-10,894267
5,0,<1,31846
7,0,>=1d,959768322
5,0,60-1d,5150452
5,0,2-4,1154255
-1,0,<1,2470086
6,0,60-1d,31292653
7,0,<1,66024826
8,0,60-1d,22
7,0,4-10,131293574
4,0,10-60,475870
7,0,2-4,63304732
8,0,4-10,32
4,0,>=1d,39003436
5,0,>=1d,4619261
4,0,2-4,88680
7,0,60-1d,752719676
8,0,>=1d,434174
8,0,<1,382878
6,0,1-2,73380248
6,0,>=1d,139751172
4,0,4-10,242628
4,0,60-1d,13808200
5,0,10-60,2987439
4,0,<1,33079
5,0,1-2,39351
6,0,2-4,46692501
6,0,10-60,63401359
8,0,10-60,56
7,0,1-2,32974421
7,0,10-60,255723593
6,0,<1,108838664

Binary file not shown.

Binary file not shown.

View File

View File

@ -0,0 +1,36 @@
6,0,4-10,199505836
7,0,4-10,755777336
-1,0,<1,3952663
5,0,2-4,495781
7,0,2-4,390478553
6,0,>=1d,292113564
7,0,60-1d,1308416813
4,0,1-2,39982
8,0,4-10,24684
6,0,<1,311177825
4,0,4-10,400599
8,0,10-60,9397
4,0,10-60,976337
5,0,<1,188610
4,0,60-1d,7618699
8,0,>=1d,711578
8,0,1-2,61384
4,0,>=1d,73841164
4,0,2-4,103581
7,0,1-2,132922390
5,0,>=1d,51287123
8,0,<1,664765
5,0,10-60,4193865
7,0,10-60,800096395
5,0,60-1d,1971893
7,0,<1,196256317
4,0,<1,13860
6,0,10-60,197074666
8,0,2-4,53375
6,0,60-1d,91864360
6,0,2-4,244883856
5,0,1-2,226157
7,0,>=1d,1382608594
5,0,4-10,2491187
6,0,1-2,280568210
8,0,60-1d,900

Binary file not shown.

Binary file not shown.

View File

View File

@ -0,0 +1,34 @@
5,0,60-1d,331929
4,0,2-4,50447
6,0,2-4,44265781
7,0,60-1d,799490797
-1,0,<1,4544421
7,0,4-10,519875128
5,0,2-4,333758
7,0,1-2,106909750
5,0,10-60,838209
5,0,4-10,737122
6,0,>=1d,235790332
7,0,10-60,476207710
8,0,<1,797658
4,0,4-10,298479
5,0,1-2,108840
6,0,60-1d,21709967
5,0,>=1d,29145185
5,0,<1,722288
4,0,10-60,430109
6,0,1-2,55841318
4,0,<1,5562
4,0,60-1d,3522738
6,0,4-10,42848563
4,0,>=1d,47657312
6,0,<1,51292750
4,0,1-2,9021
7,0,>=1d,1321688066
8,0,60-1d,182
7,0,<1,128175243
8,0,10-60,560
8,0,2-4,34
8,0,>=1d,31728
7,0,2-4,244210514
6,0,10-60,64400417

Binary file not shown.

Binary file not shown.

View File

View File

@ -0,0 +1,32 @@
5,0,>=1d,24065006
7,0,1-2,16642387
6,0,>=1d,5095832
7,0,>=1d,1333707353
5,0,1-2,51413
8,0,>=1d,59
4,0,<1,5002
6,0,1-2,16051042
4,0,>=1d,7758651
7,0,2-4,86806397
6,0,60-1d,6125245
5,0,10-60,313662
4,0,2-4,28817
6,0,4-10,77888564
5,0,<1,68072
6,0,10-60,41506399
4,0,4-10,347459
6,0,2-4,17211883
4,0,60-1d,1828664
5,0,2-4,208141
4,0,10-60,427813
7,0,10-60,290159810
7,0,<1,34018722
6,0,<1,61095340
5,0,4-10,69461
7,0,4-10,162898256
4,0,1-2,4033
7,0,60-1d,845942465
8,0,<1,38780
8,0,60-1d,38
-1,0,<1,1457876
5,0,60-1d,478983

Binary file not shown.

Binary file not shown.

View File

View File

@ -0,0 +1,36 @@
4,0,10-60,143005
7,0,<1,27095109
5,0,1-2,118612
5,0,>=1d,12449046
6,0,2-4,12286904
8,0,10-60,131690
4,0,4-10,49384
5,0,2-4,498625
7,0,2-4,41209062
8,0,2-4,41676
7,0,10-60,267957130
8,0,>=1d,47829
8,0,<1,503296
4,0,<1,3387
8,0,60-1d,134
4,0,>=1d,39737313
5,0,<1,171247
6,0,>=1d,101626554
5,0,4-10,91257
8,0,4-10,192276
4,0,2-4,15259
-1,0,<1,963888
6,0,<1,45025409
7,0,4-10,104957224
4,0,1-2,2802
7,0,>=1d,885691405
6,0,60-1d,32130223
5,0,10-60,1003509
7,0,60-1d,642935158
7,0,1-2,30708656
6,0,10-60,38209602
6,0,1-2,14727025
8,0,1-2,18432
4,0,60-1d,24102473
6,0,4-10,16160973
5,0,60-1d,139912

137
figure_7/figure7ab.py Executable file
View File

@ -0,0 +1,137 @@
#!/usr/bin/env python3
# coding: utf-8
import json
import pandas
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import ByteType
if len(sys.argv) is not 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
sys.exit()
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
#df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
class NonPriorityAcc(pyspark.AccumulatorParam):
def zero(self, value):
return {}
def addInPlace(self, v1, v2):
for key in v2:
if key in v1:
v1[key] += v2[key]
else:
v1[key] = v2[key]
return v1
non = sc.accumulator({}, NonPriorityAcc())
MICROS = 1000000
def sumrow(l, p, t, c):
t = t // (MICROS * 60)
if t < 1:
t = "<1"
elif t < 2:
t = "1-2"
elif t < 4:
t = "2-4"
elif t < 10:
t = "4-10"
elif t < 60:
t = "10-60"
elif t < 60 * 24:
t = "60-1d"
else:
t = ">=1d"
return (l, p, t, c)
def sumid(sr):
return (sr[0], sr[1], sr[2])
def for_each_task(ts):
global non
ts = sorted(ts, key=lambda x: x["time"])
in_exec = False
exec_start = None
exec_tot = 0
priority = 0
l = len(ts)
last_term = -1
for i,t in enumerate(ts):
if t["priority"] is not -1 and priority is -1:
priority = t["priority"]
if t["type"] >= 4 and t["type"] <= 8:
last_term = t["type"]
if in_exec and (t["type"] == 1 or (t["type"] >= 4 and t["type"] <= 8)):
exec_tot += t["time"] - exec_start
in_exec = False
if (not in_exec) and (t["type"] == 3):
exec_start = t["time"]
in_exec = True
return sumrow(last_term, priority, exec_tot, l)
def cleanup(x):
return {
"time": int(x.time),
"type": 0 if x.type is None else int(x.type),
"id": x.collection_id + "-" + x.instance_index,
"priority": -1 if x.priority is None else int(x.priority)
}
def sum_rows(xs):
csum = 0
for x in xs:
csum += x[3]
return csum
df2 = df.rdd \
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
.filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \
.map(cleanup) \
.groupBy(lambda x: x["id"]) \
.mapValues(for_each_task) \
.map(lambda x: x[1]) \
.groupBy(lambda x: sumid(x)) \
.mapValues(sum_rows) \
.map(lambda x: str(x[0][0]) + "," + str(x[0][1]) + "," + str(x[0][2]) + "," + str(x[1])) \
.coalesce(1) \
.saveAsTextFile(cluster + "_priority_exectime")
# vim: set ts=4 sw=4 et tw=80:

Binary file not shown.

Binary file not shown.

View File

View File

@ -0,0 +1,32 @@
6,0,<1,5855111
7,0,60-1d,601542196
7,0,10-60,95838227
6,0,>=1d,530133143
4,0,>=1d,36971224
5,0,60-1d,1010045
8,0,<1,3811
8,0,>=1d,263540
5,0,2-4,22902724
5,0,4-10,6661506
7,0,<1,12043755
6,0,60-1d,10739044
7,0,2-4,16985420
4,0,1-2,2714
7,0,1-2,6803910
5,0,>=1d,4610298
4,0,<1,1840
7,0,4-10,36246298
6,0,10-60,19503280
7,0,>=1d,1599925741
6,0,1-2,8607061
5,0,<1,275553
4,0,60-1d,6441519
8,0,60-1d,1244
4,0,2-4,8100
4,0,10-60,141727
5,0,10-60,2046864
6,0,4-10,9704610
5,0,1-2,23954
6,0,2-4,10143359
4,0,4-10,35493
-1,0,<1,397201

Binary file not shown.

Binary file not shown.

View File

View File

@ -0,0 +1,32 @@
8,0,>=1d,5054
-1,0,<1,954975
7,0,<1,24484463
6,0,60-1d,23056333
4,0,2-4,1836
4,0,4-10,8989
8,0,<1,124
7,0,2-4,23796068
7,0,>=1d,1239402690
5,0,4-10,1489254
6,0,>=1d,78233744
7,0,4-10,62455654
6,0,4-10,11418060
6,0,2-4,11039358
4,0,10-60,40994
7,0,60-1d,722926038
5,0,>=1d,8729456
7,0,1-2,4366627
5,0,<1,189088
5,0,60-1d,1497194
8,0,60-1d,1922
5,0,2-4,16174
6,0,1-2,4843246
4,0,<1,779
6,0,10-60,19472894
4,0,1-2,791
7,0,10-60,138882516
4,0,60-1d,953635
4,0,>=1d,4032484
6,0,<1,13791195
5,0,1-2,3721
5,0,10-60,325669

148
figure_8/figure8-join.py Executable file
View File

@ -0,0 +1,148 @@
#!/usr/bin/env python3
# coding: utf-8
import os
import json
import pandas as pd
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
TESTDATA = False
if len(sys.argv) is not 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
sys.exit()
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
#
# READING INSTANCE EVENTS DATA
#
dfepath = "/home/claudio/google_2019/instance_events/" + cluster
dfepath += "/" + cluster + ("_instance_events00000000000?.json.gz" if TESTDATA else "_instance_events*.json.gz")
dfe = spark.read.json(dfepath)
#
# READING INSTANCE USAGE DATA
#
dfupath = "/home/claudio/google_2019/instance_usage/" + cluster
dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.csv.gz")
usage_schema = StructType() \
.add("start_time", LongType(), True) \
.add("end_time", LongType(), True) \
.add("collection_id", StringType(), True) \
.add("instance_index", StringType(), True) \
.add("machine_id", StringType(), True) \
.add("alloc_collection_id", LongType(), True) \
.add("alloc_instance_index", StringType(), True) \
.add("collection_type", ByteType(), True) \
.add("average_usage_cpus", DoubleType(), True) \
.add("average_usage_memory", DoubleType(), True) \
.add("maximum_usage_cpus", DoubleType(), True) \
.add("maximum_usage_memory", DoubleType(), True) \
.add("random_sample_usage_cpus", DoubleType(), True) \
.add("random_sample_usage_memory", DoubleType(), True) \
.add("assigned_memory", DoubleType(), True) \
.add("page_cache_memory", DoubleType(), True) \
.add("cycles_per_instruction", DoubleType(), True) \
.add("memory_accLesses_per_instruction", DoubleType(), True) \
.add("sample_rate", DoubleType(), True) \
.add("cpu_usage_dist_00", DoubleType(), True) \
.add("cpu_usage_dist_10", DoubleType(), True) \
.add("cpu_usage_dist_20", DoubleType(), True) \
.add("cpu_usage_dist_30", DoubleType(), True) \
.add("cpu_usage_dist_40", DoubleType(), True) \
.add("cpu_usage_dist_50", DoubleType(), True) \
.add("cpu_usage_dist_60", DoubleType(), True) \
.add("cpu_usage_dist_70", DoubleType(), True) \
.add("cpu_usage_dist_80", DoubleType(), True) \
.add("cpu_usage_dist_90", DoubleType(), True) \
.add("cpu_usage_dist_91", DoubleType(), True) \
.add("cpu_usage_dist_92", DoubleType(), True) \
.add("cpu_usage_dist_93", DoubleType(), True) \
.add("cpu_usage_dist_94", DoubleType(), True) \
.add("cpu_usage_dist_95", DoubleType(), True) \
.add("cpu_usage_dist_96", DoubleType(), True) \
.add("cpu_usage_dist_97", DoubleType(), True) \
.add("cpu_usage_dist_98", DoubleType(), True) \
.add("cpu_usage_dist_99", DoubleType(), True)
dfu = spark.read.format("csv") \
.option("header", False) \
.schema(usage_schema) \
.load(dfupath)
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
print("Starting to read machine events...")
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv")
print("Dropping remove events...")
dfm = dfm[(dfm.type==1)|(dfm.type==3)]
print("Dropping missing data events...")
dfm = dfm[dfm.missing_data_reason.notnull()]
print("Projecting on useful columns...")
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
print("Sorting by time...")
dfm = dfm.sort_values(by=["machine_id", "time"])
print("Converting to broadcast variable...")
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
print("Done with machine events.")
def tabid(x):
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
# interpolate machine data by extending each last machine report before a gap to cover it
def clean_usage(x):
return [tabid(x), Decimal(x.average_usage_cpus), Decimal(x.average_usage_memory), int(x.start_time),
int(x.end_time), x.machine_id]
def interpolate_usage(ts):
ts = sorted(ts, key=lambda x: x[3])
l = len(ts)
for i in range(1, l-1):
if ts[i+1][3] > ts[i][4]:
ts[i][4] = ts[i+1][3]
return ts
dfu = dfu.rdd \
.filter(lambda x: x.start_time is not None and x.end_time is not None and
x.instance_index is not None and x.collection_id is not None and
x.machine_id is not None) \
.map(clean_usage).groupBy(lambda x: x[0]) \
.flatMap(lambda x: interpolate_usage(x[1])).toDF(["id", "acpu", "aram", "start", "end", "mid"])
try:
dfe["collection_type"] = dfe["collection_type"].cast(ByteType())
except:
dfe = dfe.withColumn("collection_type", lit(None).cast(ByteType()))
dfe = dfe.rdd \
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
.map(lambda x: [tabid(x), int(x.time), int(x.type),
Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \
.toDF(["id", "time", "type", "rcpu", "rram", "mid"])
df = dfe.alias("dfe").join(dfu.alias("dfu"), [dfe.id == dfu.id, dfe.mid == dfu.mid, dfe.time > dfu.start, dfe.time <= dfu.end]) \
.selectExpr("dfe.id as id", "dfe.mid as mid", "dfe.time as time", "dfe.type as type", "dfe.rcpu as rcpu",
"dfe.rram as rram", "dfu.acpu as acpu", "dfu.aram as aram")
df.write.parquet("/home/claudio/raid0/figure-8-join-" + cluster + ".parquet")
# vim: set ts=4 sw=4 et tw=120:

195
figure_8/figure8-measure.py Executable file
View File

@ -0,0 +1,195 @@
#!/usr/bin/env python3
# coding: utf-8
import os
import json
import pandas as pd
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
if len(sys.argv) is not 5:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {joindir}")
sys.exit()
joindir=sys.argv[4]
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", sys.argv[3]) \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.parquet(joindir + "/figure-8-join-" + cluster + ".parquet")
### TESTING ONLY
#df = df.limit(10000)
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
print("Starting to read machine events...")
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
'time': lambda x: -1 if x == '' else int(x),
'machine_id': lambda x: str(x),
'capacity.cpus': lambda x: -1 if x == '' else Decimal(x),
'capacity.memory': lambda x: -1 if x == '' else Decimal(x)})
print("Dropping remove events...")
dfm = dfm[(dfm.type!=2)&(dfm.time!=-1)&(dfm["capacity.cpus"]!=-1)&(dfm["capacity.memory"]!=-1)]
print("Dropping missing data events...")
dfm = dfm[dfm.missing_data_reason.isnull()]
print("Projecting on useful columns...")
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
print("Sorting by time...")
dfm = dfm.sort_values(by=["machine_id", "time"])
print("Converting to broadcast variable...")
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
print("Done with machine events.")
def get_machine_time_resources(machine_id):
def aux(i, j):
mid = (i + j) // 2
if dfm.value[mid][1] > machine_id:
if mid - 1 < 1 or i == j:
return None
return aux(i, mid - 1)
elif dfm.value[mid][1] < machine_id:
if mid + 1 > j or i == j:
return None
return aux(mid + 1, j)
else:
start = mid
while dfm.value[start][1] == machine_id and start >= 0: # once found, search for oldest log for machine
start -= 1
start += 1
end = mid
while end < len(dfm.value) and dfm.value[end][1] == machine_id:
end += 1
return dfm.value[start:end]
return aux(0, len(dfm.value)-1)
def increment_reserv_bucket(bucket, ceils, taskid, reserv, last_term_by_id):
idx = 0
while idx < len(ceils) and ceils[idx] < reserv:
idx += 1
if taskid not in bucket:
bucket[taskid] = [0] * (len(ceils) + 1)
bucket[taskid][idx] += 1
def bucket_sum_per_termination(bucket, last_term_by_id):
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
for tid, vs in bucket.items():
term = last_term_by_id[tid]
if result[term] is None:
result[term] = vs
else:
result[term] = [sum(x) for x in zip(result[term], vs)]
return result
def for_each_joined(x):
machine_id = x[0]
ts = x[1]
ts = sorted(ts, key=lambda x: x.time)
last_req_by_id = {} # map taskid -> last known req [cpu, ram] (data removed when task terminates)
reserv_ceils = [0, 0.2, 0.4, 0.6, 0.8, 1]
cpu_reservs_by_id = {} # map taskid -> [n, a, b, c, d, e, f] where:
# n: count of event with unknown machine config
# a: count of event with res. reserv. <0.2
# b: count of event with res. reserv. [0.2, 0.4)
# c: count of event with res. reserv. [0.4, 0.6)
# d: count of event with res. reserv. [0.6, 0.8)
# e: count of event with res. reserv. [0.8, 0.1)
# f: count of event with res. reserv. >=1
ram_reservs_by_id = {}
request_ceils = [0.025, 0.05, 0.075]
cpu_request_by_id = {} # map taskid -> [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075
ram_request_by_id = {}
util_ceils = reserv_ceils
cpu_util_by_id = {}
ram_util_by_id = {}
last_term_by_id = {} # map taskid -> last termination
machine_logs = get_machine_time_resources(machine_id)
for i, t in enumerate(ts):
if machine_logs is not None and len(machine_logs) > 1 and machine_logs[1][0] >= t.time:
machine_logs.pop(0)
if t.id not in last_term_by_id:
last_term_by_id[t.id] = -1
if t.rcpu is not None and t.rram is not None:
last_req_by_id[t.id] = (t.rcpu, t.rram)
# 8b
tot_req = [sum(x) for x in zip(*last_req_by_id.values())]
if machine_logs is not None:
reserv_cpu = tot_req[0] / machine_logs[0][2]
reserv_ram = tot_req[1] / machine_logs[0][3]
else:
reserv_cpu = -1
reserv_ram = -1
increment_reserv_bucket(cpu_reservs_by_id, reserv_ceils, t.id, reserv_cpu, last_term_by_id)
increment_reserv_bucket(ram_reservs_by_id, reserv_ceils, t.id, reserv_ram, last_term_by_id)
# 8a
increment_reserv_bucket(cpu_request_by_id, request_ceils, t.id, t.rcpu, last_term_by_id)
increment_reserv_bucket(ram_request_by_id, request_ceils, t.id, t.rram, last_term_by_id)
# 8c
increment_reserv_bucket(cpu_util_by_id, util_ceils, t.id, t.acpu, last_term_by_id)
increment_reserv_bucket(ram_util_by_id, util_ceils, t.id, t.aram, last_term_by_id)
if t.type >= 4 and t.type <= 8:
last_term_by_id[t.id] = t.type
resobj = {'rcpu': cpu_request_by_id, 'rram': ram_request_by_id, 'rscpu': cpu_reservs_by_id,
'rsram': ram_reservs_by_id, 'ucpu': cpu_util_by_id, 'uram': ram_util_by_id}
for k, v in resobj.items():
resobj[k] = bucket_sum_per_termination(v, last_term_by_id)
return resobj
def fold_resobjs(ro1, ro2):
if ro1 is None:
return ro2
elif ro2 is None:
return ro1
else:
for k in ro1.keys():
for kk in ro1[k].keys():
if ro1[k][kk] is None:
ro1[k][kk] = ro2[k][kk]
elif ro2[k][kk] is None:
continue
else:
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
return ro1
# TODO: partition by id and in the for-each-row
# function implement lookup to dfm.value to understand its memory capacity
import random
result = df.rdd \
.groupBy(lambda x: x.mid) \
.partitionBy(2000, lambda x: random.randint(0, 2000-1)) \
.map(for_each_joined) \
.fold(None, fold_resobjs)
d = os.path.dirname(os.path.realpath(__file__))
with open(d + "/" + cluster + "_figure8.json", "w") as f:
json.dump(result, f)
# vim: set ts=4 sw=4 et tw=120:

275
figure_8/figure8.py Executable file
View File

@ -0,0 +1,275 @@
#!/usr/bin/env python3
# coding: utf-8
import os
import json
import pandas as pd
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
TESTDATA = True
if len(sys.argv) is not 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
sys.exit()
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
#
# READING INSTANCE EVENTS DATA
#
dfepath = "/home/claudio/google_2019/instance_events/" + cluster
dfepath += "/" + cluster + ("_instance_events00000000000?.json.gz" if TESTDATA else "_instance_events*.json.gz")
dfe = spark.read.json(dfepath)
try:
dfe["collection_type"] = dfe["collection_type"].cast(ByteType())
except:
dfe = dfe.withColumn("collection_type", lit(None).cast(ByteType()))
#
# READING INSTANCE USAGE DATA
#
dfupath = "/home/claudio/google_2019/instance_usage/" + cluster
dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.csv.gz")
usage_schema = StructType() \
.add("start_time", LongType(), True) \
.add("end_time", LongType(), True) \
.add("collection_id", StringType(), True) \
.add("instance_index", StringType(), True) \
.add("machine_id", StringType(), True) \
.add("alloc_collection_id", LongType(), True) \
.add("alloc_instance_index", StringType(), True) \
.add("collection_type", ByteType(), True) \
.add("average_usage_cpus", DoubleType(), True) \
.add("average_usage_memory", DoubleType(), True) \
.add("maximum_usage_cpus", DoubleType(), True) \
.add("maximum_usage_memory", DoubleType(), True) \
.add("random_sample_usage_cpus", DoubleType(), True) \
.add("random_sample_usage_memory", DoubleType(), True) \
.add("assigned_memory", DoubleType(), True) \
.add("page_cache_memory", DoubleType(), True) \
.add("cycles_per_instruction", DoubleType(), True) \
.add("memory_accLesses_per_instruction", DoubleType(), True) \
.add("sample_rate", DoubleType(), True) \
.add("cpu_usage_dist_00", DoubleType(), True) \
.add("cpu_usage_dist_10", DoubleType(), True) \
.add("cpu_usage_dist_20", DoubleType(), True) \
.add("cpu_usage_dist_30", DoubleType(), True) \
.add("cpu_usage_dist_40", DoubleType(), True) \
.add("cpu_usage_dist_50", DoubleType(), True) \
.add("cpu_usage_dist_60", DoubleType(), True) \
.add("cpu_usage_dist_70", DoubleType(), True) \
.add("cpu_usage_dist_80", DoubleType(), True) \
.add("cpu_usage_dist_90", DoubleType(), True) \
.add("cpu_usage_dist_91", DoubleType(), True) \
.add("cpu_usage_dist_92", DoubleType(), True) \
.add("cpu_usage_dist_93", DoubleType(), True) \
.add("cpu_usage_dist_94", DoubleType(), True) \
.add("cpu_usage_dist_95", DoubleType(), True) \
.add("cpu_usage_dist_96", DoubleType(), True) \
.add("cpu_usage_dist_97", DoubleType(), True) \
.add("cpu_usage_dist_98", DoubleType(), True) \
.add("cpu_usage_dist_99", DoubleType(), True)
dfu = spark.read.format("csv") \
.option("header", False) \
.schema(usage_schema) \
.load(dfupath)
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
print("Starting to read machine events...")
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv")
print("Dropping remove events...")
dfm = dfm[(dfm.type==1)|(dfm.type==3)]
print("Dropping missing data events...")
dfm = dfm[dfm.missing_data_reason.notnull()]
print("Projecting on useful columns...")
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
print("Sorting by time...")
dfm = dfm.sort_values(by=["machine_id", "time"])
print("Converting to broadcast variable...")
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
print("Done with machine events.")
def tabid(x):
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
# interpolate machine data by extending each last machine report before a gap to cover it
def clean_usage(x):
return [tabid(x), Decimal(x.average_usage_cpus), Decimal(x.average_usage_memory), int(x.start_time),
int(x.end_time), x.machine_id]
def interpolate_usage(ts):
ts = sorted(ts, key=lambda x: x[3])
l = len(ts)
for i in range(1, l-1):
if ts[i+1][3] > ts[i][4]:
ts[i][4] = ts[i+1][3]
return ts
dfu = dfu.rdd \
.filter(lambda x: x.start_time is not None and x.end_time is not None and
x.instance_index is not None and x.collection_id is not None and
x.machine_id is not None) \
.map(clean_usage).groupBy(lambda x: x[0]) \
.flatMap(lambda x: interpolate_usage(x[1])).toDF(["id", "acpu", "aram", "start", "end", "mid"])
dfe = dfe.rdd \
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
.map(lambda x: [tabid(x), int(x.time), int(x.type),
Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \
.toDF(["id", "time", "type", "rcpu", "rram", "mid"])
df = dfe.join(dfu, [dfe.id == dfu.id, dfe.mid == dfu.mid, dfe.time >= dfu.start, dfe.time < dfu.end])
def get_machine_time_resources(machine_id):
def aux(i, j):
mid = (i + j) // 2
print(i, j, mid)
print(dfm.value[mid])
if dfm.value[mid][1] > machine_id:
return aux(i, mid - 1)
elif dfm.value[mid][1] < machine_id:
return aux(mid + 1, j)
else:
start = mid
while dfm.value[start][1] == machine_id and start >= 0: # once found, search for oldest log for machine
start -= 1
start += 1
end = mid
while dfm.value[end][1] == machine_id and end < len(dfm.value):
end += 1
return dfm.value[start:end]
return aux(0, len(dfm.value)-1)
def increment_reserv_bucket(bucket, ceils, taskid, reserv):
idx = 0
while ceils[idx] < reserv and idx < len(ceils):
idx += 1
if taskid not in bucket:
bucket[taskid] = [0] * (len(ceils) + 1)
bucket[taskid][idx] += 1
def bucket_sum_per_termination(bucket, last_term_by_id):
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
for tid, vs in bucket.items():
term = last_term_by_id[tid]
if result[term] is None:
result[term] = vs
else:
result[term] = [sum(x) for x in zip(result[term], vs)]
return result
def for_each_joined(x):
machine_id = x[0]
ts = x[1]
ts = sorted(ts, key=lambda x: x.time)
last_req_by_id = {} # map taskid -> last known req [cpu, ram] (data removed when task terminates)
reserv_ceils = [0.2, 0.4, 0.6, 0.8, 1]
cpu_reservs_by_id = {} # map taskid -> [a, b, c, d, e, f] where:
# a: count of event with res. reserv. <0.2
# b: count of event with res. reserv. [0.2, 0.4)
# c: count of event with res. reserv. [0.4, 0.6)
# d: count of event with res. reserv. [0.6, 0.8)
# e: count of event with res. reserv. [0.8, 0.1)
# f: count of event with res. reserv. >=1
ram_reservs_by_id = {}
request_ceils = [0.025, 0.05, 0.075]
cpu_request_by_id = {} # map taskid -> [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075
ram_request_by_id = {}
util_ceils = reserv_ceils
cpu_util_by_id = {}
ram_util_by_id = {}
last_term_by_id = {} # map taskid -> last termination
machine_logs = get_machine_time_resources(machine_id)
for i, t in enumerate(ts):
if len(machine_logs) > 1 and machine_logs[1][0] >= t.time:
machine_logs.pop(0)
if t.id not in last_term_by_id:
last_term_by_id[t.id] = -1
if t.type >= 4 and t.type <= 8:
last_term_by_id[t.id] = t.type
del last_req_by_id[t.id]
else:
if t.rcpu is not None and t.rram is not None:
last_req_by_id[t.id] = (t.rcpu, t.rram)
# 8b
tot_req = [sum(x) for x in zip(*last_req_by_id.values())]
reserv_cpu = tot_req[0] / machine_logs[0][2]
reserv_ram = tot_req[1] / machine_logs[0][3]
increment_reserv_bucket(cpu_reservs_by_id, reserv_ceils, t.id, reserv_cpu)
increment_reserv_bucket(ram_reservs_by_id, reserv_ceils, t.id, reserv_ram)
# 8a
increment_reserv_bucket(cpu_request_by_id, request_ceils, t.id, t.rcpu)
increment_reserv_bucket(ram_request_by_id, request_ceils, t.id, t.rram)
# 8c
increment_reserv_bucket(cpu_util_by_id, util_ceils, t.id, t.acpu)
increment_reserv_bucket(ram_util_by_id, util_ceils, t.id, t.aram)
resobj = {'rcpu': cpu_request_by_id, 'rram': ram_request_by_id, 'rscpu': cpu_reservs_by_id,
'rsram': ram_reservs_by_id, 'ucpu': cpu_util_by_id, 'uram': ram_util_by_id}
for k, v in resobj.items():
resobj[k] = bucket_sum_per_termination(v, last_term_by_id)
return resobj
def fold_resobjs(ro1, ro2):
if ro1 is None:
return ro2
elif ro2 is None:
return ro1
else:
for k in ro1.keys():
for kk in ro1[k].keys():
if ro1[k][kk] is None:
ro1[k][kk] = ro2[k][kk]
elif ro2[k][kk] is None:
continue
else:
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
return ro1
# TODO: partition by id and in the for-each-row
# function implement lookup to dfm.value to understand its memory capacity
result = df.rdd \
.groupBy(lambda x: x.mid) \
.map(for_each_joined) \
.fold(None, fold_resobjs)
d = os.path.dirname(os.path.realpath(__file__))
with open(d + "/" + cluster + "_figure8.json", "w") as f:
json.dump(result, f)
# vim: set ts=4 sw=4 et tw=120:

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
lost c_res_actual_per_job parts 01816-01824

Binary file not shown.

View File

View File

@ -0,0 +1,6 @@
-1,73165704815830.230712890625000000,73345651566476.821899414062500000
6,146534454172963.142395019531250000,77692052834547.042846679687500000
5,239202281025317.192077636718750000,199303134846755.981445312500000000
4,1410420680006233.215332031250000000,829767989678030.014038085937500000
8,957825860821.723937988281250000,635555632653.236389160156250000
7,8624573251690182.685852050781250000,5840241764609245.300292968750000000
1 -1 73165704815830.230712890625000000 73345651566476.821899414062500000
2 6 146534454172963.142395019531250000 77692052834547.042846679687500000
3 5 239202281025317.192077636718750000 199303134846755.981445312500000000
4 4 1410420680006233.215332031250000000 829767989678030.014038085937500000
5 8 957825860821.723937988281250000 635555632653.236389160156250000
6 7 8624573251690182.685852050781250000 5840241764609245.300292968750000000

Some files were not shown because too many files have changed in this diff Show More