119 lines
3.4 KiB
Python
Executable file
119 lines
3.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# coding: utf-8
|
|
|
|
import os
|
|
import json
|
|
import pandas as pd
|
|
import findspark
|
|
findspark.init()
|
|
import pyspark
|
|
import pyspark.sql
|
|
import sys
|
|
import gzip
|
|
from pyspark import AccumulatorParam
|
|
from pyspark.sql.functions import lit
|
|
from pyspark.sql import Window
|
|
from pyspark.sql.types import *
|
|
from decimal import *
|
|
import random
|
|
|
|
if len(sys.argv) is not 4:
|
|
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
|
sys.exit()
|
|
|
|
cluster=sys.argv[1]
|
|
|
|
spark = pyspark.sql.SparkSession.builder \
|
|
.appName("task_slowdown") \
|
|
.config("spark.driver.maxResultSize", "128g") \
|
|
.config("spark.local.dir", sys.argv[2]) \
|
|
.config("spark.driver.memory", sys.argv[3]) \
|
|
.getOrCreate()
|
|
sc = spark.sparkContext
|
|
|
|
# READING INSTANCE EVENTS DATA
|
|
dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
|
#dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json"
|
|
df = spark.read.json(dfepath)
|
|
|
|
def tabid(x):
|
|
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
|
|
|
|
def bucket_sum_per_termination(bucket, last_term_by_id):
|
|
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
|
for tid, vs in bucket.items():
|
|
term = last_term_by_id[tid]
|
|
if result[term] is None:
|
|
result[term] = vs
|
|
else:
|
|
result[term] = [sum(x) for x in zip(result[term], vs)]
|
|
return result
|
|
|
|
def tally_event(bucket, term, nexec):
|
|
if term not in bucket:
|
|
bucket[term] = {}
|
|
if nexec not in bucket[term]:
|
|
bucket[term][nexec] = 0
|
|
bucket[term][nexec] += 1
|
|
|
|
def for_each_joined(x):
|
|
machine_id = x[0]
|
|
ts = x[1]
|
|
|
|
ts = sorted(ts, key=lambda x: x["time"])
|
|
in_execution = set()
|
|
chum = {}
|
|
|
|
for t in ts:
|
|
in_execution.add(t["id"])
|
|
tally_event(chum, t["term"], len(in_execution))
|
|
if t["end"]:
|
|
in_execution.remove(t["id"])
|
|
|
|
return chum
|
|
|
|
def fold_resobjs(ro1, ro2):
|
|
if not ro1.keys():
|
|
return ro2
|
|
elif ro2.keys():
|
|
for k in set(ro1.keys()).union(set(ro2.keys())):
|
|
if k not in ro1:
|
|
ro1[k] = ro2[k]
|
|
elif k in ro2:
|
|
for kk in set(ro1[k].keys()).union(set(ro2[k].keys())):
|
|
if kk not in ro1[k]:
|
|
ro1[k][kk] = ro2[k][kk]
|
|
elif kk in ro2[k]:
|
|
ro1[k][kk] += ro2[k][kk]
|
|
return ro1
|
|
|
|
def mark_next(data):
|
|
ts = data[1]
|
|
ts = sorted(ts, key=lambda z: z[1])
|
|
last_term = -1
|
|
for i in range(0, len(ts)):
|
|
t = ts[i]
|
|
ts[i] = {"id": t[0], "time": t[1], "type": t[2], "mid": t[3], "end": (i == len(ts) -1 or t[3] != ts[i+1][3])}
|
|
if ts[i]["type"] >= 4 or ts[i]["type"] <= 8:
|
|
last_term = ts[i]["type"]
|
|
for t in ts:
|
|
t["term"] = last_term
|
|
return ts
|
|
|
|
result = df.rdd \
|
|
.filter(lambda x: x.time is not None and x.type is not None and
|
|
x.instance_index is not None and x.collection_id is not None) \
|
|
.map(lambda x: [tabid(x), int(x.time), int(x.type), x.machine_id]) \
|
|
.groupBy(lambda x: x[0]) \
|
|
.flatMap(mark_next) \
|
|
.groupBy(lambda x: x["mid"]) \
|
|
.partitionBy(10000, lambda x: random.randint(0, 10000-1)) \
|
|
.map(for_each_joined) \
|
|
.fold({}, fold_resobjs)
|
|
|
|
d = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
with open(d + "/" + cluster + "_figure7c.json", "w") as f:
|
|
json.dump(result, f)
|
|
|
|
# vim: set ts=4 sw=4 et tw=120:
|