bachelorThesis/figure_7/figure7c.py

145 lines
4.2 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf-8
import os
import json
import pandas as pd
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
import random
CHECKDIR = "/home/claudio/google_2019/thesis_queries/figure_7/"
if len(sys.argv) is not 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
sys.exit()
cluster=sys.argv[1]
if os.path.exists(CHECKDIR + cluster + "_figure7c.csv"):
print("already computed")
sys.exit()
if os.path.exists(CHECKDIR + cluster + "_figure7c_working"):
print("already in execution")
sys.exit()
os.system("touch " + CHECKDIR + cluster + "_figure7c_working")
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
# READING INSTANCE EVENTS DATA
dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz"
#dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events00000000000?.json.gz"
df = spark.read.json(dfepath)
def tabid(x):
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
def bucket_sum_per_termination(bucket, last_term_by_id):
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
for tid, vs in bucket.items():
term = last_term_by_id[tid]
if result[term] is None:
result[term] = vs
else:
result[term] = [sum(x) for x in zip(result[term], vs)]
return result
def tally_event(bucket, term, nexec):
if term not in bucket:
bucket[term] = {}
if nexec not in bucket[term]:
bucket[term][nexec] = 0
bucket[term][nexec] += 1
def for_each_joined(x):
machine_id = x[0]
if x[0] is None:
return {}
ts = x[1]
ts = sorted(ts, key=lambda x: x["time"] or -1)
in_execution = set()
chum = {}
for t in ts:
in_execution.add(t["id"])
tally_event(chum, t["term"], len(in_execution))
if t["end"]:
in_execution.remove(t["id"])
return chum
def fold_resobjs(ro1, ro2):
if not ro1.keys():
return ro2
elif ro2.keys():
for k in set(ro1.keys()).union(set(ro2.keys())):
if k not in ro1:
ro1[k] = ro2[k]
elif k in ro2:
for kk in set(ro1[k].keys()).union(set(ro2[k].keys())):
if kk not in ro1[k]:
ro1[k][kk] = ro2[k][kk]
elif kk in ro2[k]:
ro1[k][kk] += ro2[k][kk]
return ro1
def mark_next(data):
ts = list(data[1])
ts = sorted(ts, key=lambda z: z[1] or -1)
last_term = -1
for i in range(0, len(ts)):
t = ts[i]
ts[i] = {"id": t[0], "time": t[1], "type": t[2], "mid": t[3], "end": (i == len(ts) -1 or t[3] != ts[i+1][3])}
if ts[i]["type"] >= 4 and ts[i]["type"] <= 8:
last_term = ts[i]["type"]
for t in ts:
t["term"] = last_term
return ts
def to_csv(result):
out = "term,n_exec,count\n"
for key in result.keys():
for key2 in result[key].keys():
out += str(key) + "," + str(key2) + "," + str(result[key][key2]) + "\n"
return out
result = df.rdd \
.filter(lambda x: x.time is not None and x.type is not None and
x.instance_index is not None and x.collection_id is not None) \
.map(lambda x: [tabid(x), int(x.time), int(x.type), x.machine_id]) \
.groupBy(lambda x: x[0]) \
.flatMap(mark_next) \
.filter(lambda x: x["mid"] is not None) \
.groupBy(lambda x: x["mid"]) \
.partitionBy(1000, lambda x: random.randint(0, 1000-1)) \
.map(for_each_joined) \
.fold({}, fold_resobjs)
d = os.path.dirname(os.path.realpath(__file__))
with open(d + "/" + cluster + "_figure7c.csv", "w") as f:
f.write(to_csv(result))
os.system("rm " + CHECKDIR + cluster + "_figure7c_working")
# vim: set ts=4 sw=4 et tw=120: