bachelorThesis/figure_8/figure8-measure.py

195 lines
6.8 KiB
Python
Executable file

#!/usr/bin/env python3
# coding: utf-8
import os
import json
import pandas as pd
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
if len(sys.argv) is not 5:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {joindir}")
sys.exit()
joindir=sys.argv[4]
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", sys.argv[3]) \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.parquet(joindir + "/figure-8-join-" + cluster + ".parquet")
### TESTING ONLY
#df = df.limit(10000)
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
print("Starting to read machine events...")
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
'time': lambda x: -1 if x == '' else int(x),
'machine_id': lambda x: str(x),
'capacity.cpus': lambda x: -1 if x == '' else Decimal(x),
'capacity.memory': lambda x: -1 if x == '' else Decimal(x)})
print("Dropping remove events...")
dfm = dfm[(dfm.type!=2)&(dfm.time!=-1)&(dfm["capacity.cpus"]!=-1)&(dfm["capacity.memory"]!=-1)]
print("Dropping missing data events...")
dfm = dfm[dfm.missing_data_reason.isnull()]
print("Projecting on useful columns...")
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
print("Sorting by time...")
dfm = dfm.sort_values(by=["machine_id", "time"])
print("Converting to broadcast variable...")
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
print("Done with machine events.")
def get_machine_time_resources(machine_id):
def aux(i, j):
mid = (i + j) // 2
if dfm.value[mid][1] > machine_id:
if mid - 1 < 1 or i == j:
return None
return aux(i, mid - 1)
elif dfm.value[mid][1] < machine_id:
if mid + 1 > j or i == j:
return None
return aux(mid + 1, j)
else:
start = mid
while dfm.value[start][1] == machine_id and start >= 0: # once found, search for oldest log for machine
start -= 1
start += 1
end = mid
while end < len(dfm.value) and dfm.value[end][1] == machine_id:
end += 1
return dfm.value[start:end]
return aux(0, len(dfm.value)-1)
def increment_reserv_bucket(bucket, ceils, taskid, reserv, last_term_by_id):
idx = 0
while idx < len(ceils) and ceils[idx] < reserv:
idx += 1
if taskid not in bucket:
bucket[taskid] = [0] * (len(ceils) + 1)
bucket[taskid][idx] += 1
def bucket_sum_per_termination(bucket, last_term_by_id):
result = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
for tid, vs in bucket.items():
term = last_term_by_id[tid]
if result[term] is None:
result[term] = vs
else:
result[term] = [sum(x) for x in zip(result[term], vs)]
return result
def for_each_joined(x):
machine_id = x[0]
ts = x[1]
ts = sorted(ts, key=lambda x: x.time)
last_req_by_id = {} # map taskid -> last known req [cpu, ram] (data removed when task terminates)
reserv_ceils = [0, 0.2, 0.4, 0.6, 0.8, 1]
cpu_reservs_by_id = {} # map taskid -> [n, a, b, c, d, e, f] where:
# n: count of event with unknown machine config
# a: count of event with res. reserv. <0.2
# b: count of event with res. reserv. [0.2, 0.4)
# c: count of event with res. reserv. [0.4, 0.6)
# d: count of event with res. reserv. [0.6, 0.8)
# e: count of event with res. reserv. [0.8, 0.1)
# f: count of event with res. reserv. >=1
ram_reservs_by_id = {}
request_ceils = [0.025, 0.05, 0.075]
cpu_request_by_id = {} # map taskid -> [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075
ram_request_by_id = {}
util_ceils = reserv_ceils
cpu_util_by_id = {}
ram_util_by_id = {}
last_term_by_id = {} # map taskid -> last termination
machine_logs = get_machine_time_resources(machine_id)
for i, t in enumerate(ts):
if machine_logs is not None and len(machine_logs) > 1 and machine_logs[1][0] >= t.time:
machine_logs.pop(0)
if t.id not in last_term_by_id:
last_term_by_id[t.id] = -1
if t.rcpu is not None and t.rram is not None:
last_req_by_id[t.id] = (t.rcpu, t.rram)
# 8b
tot_req = [sum(x) for x in zip(*last_req_by_id.values())]
if machine_logs is not None:
reserv_cpu = tot_req[0] / machine_logs[0][2]
reserv_ram = tot_req[1] / machine_logs[0][3]
else:
reserv_cpu = -1
reserv_ram = -1
increment_reserv_bucket(cpu_reservs_by_id, reserv_ceils, t.id, reserv_cpu, last_term_by_id)
increment_reserv_bucket(ram_reservs_by_id, reserv_ceils, t.id, reserv_ram, last_term_by_id)
# 8a
increment_reserv_bucket(cpu_request_by_id, request_ceils, t.id, t.rcpu, last_term_by_id)
increment_reserv_bucket(ram_request_by_id, request_ceils, t.id, t.rram, last_term_by_id)
# 8c
increment_reserv_bucket(cpu_util_by_id, util_ceils, t.id, t.acpu, last_term_by_id)
increment_reserv_bucket(ram_util_by_id, util_ceils, t.id, t.aram, last_term_by_id)
if t.type >= 4 and t.type <= 8:
last_term_by_id[t.id] = t.type
resobj = {'rcpu': cpu_request_by_id, 'rram': ram_request_by_id, 'rscpu': cpu_reservs_by_id,
'rsram': ram_reservs_by_id, 'ucpu': cpu_util_by_id, 'uram': ram_util_by_id}
for k, v in resobj.items():
resobj[k] = bucket_sum_per_termination(v, last_term_by_id)
return resobj
def fold_resobjs(ro1, ro2):
if ro1 is None:
return ro2
elif ro2 is None:
return ro1
else:
for k in ro1.keys():
for kk in ro1[k].keys():
if ro1[k][kk] is None:
ro1[k][kk] = ro2[k][kk]
elif ro2[k][kk] is None:
continue
else:
ro1[k][kk] = [sum(x) for x in zip(ro1[k][kk], ro2[k][kk])]
return ro1
# TODO: partition by id and in the for-each-row
# function implement lookup to dfm.value to understand its memory capacity
import random
result = df.rdd \
.groupBy(lambda x: x.mid) \
.partitionBy(2000, lambda x: random.randint(0, 2000-1)) \
.map(for_each_joined) \
.fold(None, fold_resobjs)
d = os.path.dirname(os.path.realpath(__file__))
with open(d + "/" + cluster + "_figure8.json", "w") as f:
json.dump(result, f)
# vim: set ts=4 sw=4 et tw=120: