corrections to abef only
This commit is contained in:
parent
a7515bfee6
commit
43a082fdf0
1 changed files with 24 additions and 43 deletions
|
@ -16,10 +16,12 @@ from pyspark.sql import Window
|
||||||
from pyspark.sql.types import *
|
from pyspark.sql.types import *
|
||||||
from decimal import *
|
from decimal import *
|
||||||
|
|
||||||
if len(sys.argv) is not 4:
|
if len(sys.argv) is not 5:
|
||||||
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {joindir}")
|
||||||
sys.exit()
|
sys.exit()
|
||||||
|
|
||||||
|
joindir=sys.argv[4]
|
||||||
|
|
||||||
cluster=sys.argv[1]
|
cluster=sys.argv[1]
|
||||||
|
|
||||||
spark = pyspark.sql.SparkSession.builder \
|
spark = pyspark.sql.SparkSession.builder \
|
||||||
|
@ -30,25 +32,7 @@ spark = pyspark.sql.SparkSession.builder \
|
||||||
.getOrCreate()
|
.getOrCreate()
|
||||||
sc = spark.sparkContext
|
sc = spark.sparkContext
|
||||||
|
|
||||||
# READING INSTANCE EVENTS DATA
|
df = spark.read.parquet(joindir + "/figure-8-join-" + cluster + ".parquet")
|
||||||
dfpath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
|
||||||
df = spark.read.json(dfpath)
|
|
||||||
|
|
||||||
### TESTING ONLY!
|
|
||||||
df = df.limit(50000)
|
|
||||||
|
|
||||||
try:
|
|
||||||
df["collection_type"] = df["collection_type"].cast(ByteType())
|
|
||||||
except:
|
|
||||||
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
|
|
||||||
|
|
||||||
df = df.rdd \
|
|
||||||
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
|
|
||||||
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
|
|
||||||
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
|
|
||||||
.map(lambda x: [tabid(x), int(x.time), int(x.type),
|
|
||||||
Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \
|
|
||||||
.toDF(["id", "time", "type", "rcpu", "rram", "mid"])
|
|
||||||
|
|
||||||
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
||||||
print("Starting to read machine events...")
|
print("Starting to read machine events...")
|
||||||
|
@ -96,11 +80,11 @@ def get_machine_time_resources(machine_id, time):
|
||||||
|
|
||||||
return aux(0, len(dfm.value)-1)
|
return aux(0, len(dfm.value)-1)
|
||||||
|
|
||||||
def increment_reserv_bucket(bucket, ceils, reserv, last_term_by_id):
|
def increment_reserv_bucket(bucket, value):
|
||||||
|
if value < 0:
|
||||||
idx = 0
|
idx = 0
|
||||||
while idx < len(ceils) and ceils[idx] < reserv:
|
else:
|
||||||
idx += 1
|
idx = 40 if value >= 1 else (int(value * 40) + 1)
|
||||||
|
|
||||||
bucket[idx] += 1
|
bucket[idx] += 1
|
||||||
|
|
||||||
def for_each_joined(x):
|
def for_each_joined(x):
|
||||||
|
@ -110,34 +94,31 @@ def for_each_joined(x):
|
||||||
term = -1
|
term = -1
|
||||||
ts = sorted(ts, key=lambda x: x.time)
|
ts = sorted(ts, key=lambda x: x.time)
|
||||||
|
|
||||||
request_ceils = [0.025, 0.05, 0.075]
|
cpu_util = [0] * 41
|
||||||
cpu_request = [0] * 4 # [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075
|
ram_util = [0] * 41
|
||||||
ram_request = [0] * 4 # [a, b, c, d] where <0.025, [0.025, 0.05), [0.05,0.075), >=0.075
|
cpu_request = [0] * 41
|
||||||
|
ram_request = [0] * 41
|
||||||
reserv_ceils = [0, 0.2, 0.4, 0.6, 0.8, 1]
|
|
||||||
cpu_reservs = [0] * 7 # [n, a, b, c, d, e, f] where:
|
|
||||||
ram_reservs = [0] * 7
|
|
||||||
|
|
||||||
for i, t in enumerate(ts):
|
for i, t in enumerate(ts):
|
||||||
machine_log = get_machine_time_resources(mid, t.time)
|
machine_log = get_machine_time_resources(mid, t.time)
|
||||||
if machine_log is not None:
|
if machine_log is not None:
|
||||||
reserv_cpu = tot_req[0] / machine_logs[0][2]
|
util_cpu = t.acpu / machine_log[2]
|
||||||
reserv_ram = tot_req[1] / machine_logs[0][3]
|
util_ram = t.aram / machine_log[3]
|
||||||
else:
|
else:
|
||||||
reserv_cpu = -1
|
util_cpu = -1
|
||||||
reserv_ram = -1
|
util_ram = -1
|
||||||
# 8a-b
|
# 8a-b
|
||||||
increment_reserv_bucket(cpu_request, request_ceils, t.id, t.rcpu)
|
increment_reserv_bucket(cpu_request, t.rcpu)
|
||||||
increment_reserv_bucket(ram_request, request_ceils, t.id, t.rram)
|
increment_reserv_bucket(ram_request, t.rram)
|
||||||
# 8c-d
|
# 8e-f
|
||||||
increment_reserv_bucket(cpu_reservs, reserv_ceils, t.id, reserv_cpu)
|
increment_reserv_bucket(cpu_util, t.acpu)
|
||||||
increment_reserv_bucket(ram_reservs, reserv_ceils, t.id, reserv_ram)
|
increment_reserv_bucket(ram_util, t.aram)
|
||||||
|
|
||||||
if t.type >= 4 and t.type <= 8:
|
if t.type >= 4 and t.type <= 8:
|
||||||
term = t.type
|
term = t.type
|
||||||
|
|
||||||
res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
res = {-1: None, 4: None, 5: None, 6: None, 7: None, 8: None}
|
||||||
res[term] = {'rcpu': cpu_request, 'rram': ram_request, 'rscpu': cpu_reservs, 'rsram': ram_reservs}
|
res[term] = {'rcpu': cpu_request, 'rram': ram_request, 'ucpu': cpu_util, 'uram': ram_util}
|
||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue