148 lines
5.9 KiB
Python
Executable file
148 lines
5.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# coding: utf-8
|
|
|
|
import os
|
|
import json
|
|
import pandas as pd
|
|
import findspark
|
|
findspark.init()
|
|
import pyspark
|
|
import pyspark.sql
|
|
import sys
|
|
import gzip
|
|
from pyspark import AccumulatorParam
|
|
from pyspark.sql.functions import lit
|
|
from pyspark.sql import Window
|
|
from pyspark.sql.types import *
|
|
from decimal import *
|
|
|
|
TESTDATA = False
|
|
|
|
if len(sys.argv) is not 4:
|
|
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
|
sys.exit()
|
|
|
|
cluster=sys.argv[1]
|
|
|
|
spark = pyspark.sql.SparkSession.builder \
|
|
.appName("task_slowdown") \
|
|
.config("spark.driver.maxResultSize", "128g") \
|
|
.config("spark.local.dir", sys.argv[2]) \
|
|
.config("spark.driver.memory", sys.argv[3]) \
|
|
.getOrCreate()
|
|
sc = spark.sparkContext
|
|
|
|
#
|
|
# READING INSTANCE EVENTS DATA
|
|
#
|
|
dfepath = "/home/claudio/google_2019/instance_events/" + cluster
|
|
dfepath += "/" + cluster + ("_instance_events00000000000?.json.gz" if TESTDATA else "_instance_events*.json.gz")
|
|
dfe = spark.read.json(dfepath)
|
|
#
|
|
# READING INSTANCE USAGE DATA
|
|
#
|
|
dfupath = "/home/claudio/google_2019/instance_usage/" + cluster
|
|
dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.json.gz")
|
|
usage_schema = StructType() \
|
|
.add("start_time", LongType(), True) \
|
|
.add("end_time", LongType(), True) \
|
|
.add("collection_id", StringType(), True) \
|
|
.add("instance_index", StringType(), True) \
|
|
.add("machine_id", StringType(), True) \
|
|
.add("alloc_collection_id", LongType(), True) \
|
|
.add("alloc_instance_index", StringType(), True) \
|
|
.add("collection_type", ByteType(), True) \
|
|
.add("average_usage_cpus", DoubleType(), True) \
|
|
.add("average_usage_memory", DoubleType(), True) \
|
|
.add("maximum_usage_cpus", DoubleType(), True) \
|
|
.add("maximum_usage_memory", DoubleType(), True) \
|
|
.add("random_sample_usage_cpus", DoubleType(), True) \
|
|
.add("random_sample_usage_memory", DoubleType(), True) \
|
|
.add("assigned_memory", DoubleType(), True) \
|
|
.add("page_cache_memory", DoubleType(), True) \
|
|
.add("cycles_per_instruction", DoubleType(), True) \
|
|
.add("memory_accLesses_per_instruction", DoubleType(), True) \
|
|
.add("sample_rate", DoubleType(), True) \
|
|
.add("cpu_usage_dist_00", DoubleType(), True) \
|
|
.add("cpu_usage_dist_10", DoubleType(), True) \
|
|
.add("cpu_usage_dist_20", DoubleType(), True) \
|
|
.add("cpu_usage_dist_30", DoubleType(), True) \
|
|
.add("cpu_usage_dist_40", DoubleType(), True) \
|
|
.add("cpu_usage_dist_50", DoubleType(), True) \
|
|
.add("cpu_usage_dist_60", DoubleType(), True) \
|
|
.add("cpu_usage_dist_70", DoubleType(), True) \
|
|
.add("cpu_usage_dist_80", DoubleType(), True) \
|
|
.add("cpu_usage_dist_90", DoubleType(), True) \
|
|
.add("cpu_usage_dist_91", DoubleType(), True) \
|
|
.add("cpu_usage_dist_92", DoubleType(), True) \
|
|
.add("cpu_usage_dist_93", DoubleType(), True) \
|
|
.add("cpu_usage_dist_94", DoubleType(), True) \
|
|
.add("cpu_usage_dist_95", DoubleType(), True) \
|
|
.add("cpu_usage_dist_96", DoubleType(), True) \
|
|
.add("cpu_usage_dist_97", DoubleType(), True) \
|
|
.add("cpu_usage_dist_98", DoubleType(), True) \
|
|
.add("cpu_usage_dist_99", DoubleType(), True)
|
|
|
|
dfu = spark.read.format("json") \
|
|
.option("header", False) \
|
|
.schema(usage_schema) \
|
|
.load(dfupath)
|
|
|
|
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
|
print("Starting to read machine events...")
|
|
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv")
|
|
print("Dropping remove events...")
|
|
dfm = dfm[(dfm.type==1)|(dfm.type==3)]
|
|
print("Dropping missing data events...")
|
|
dfm = dfm[dfm.missing_data_reason.notnull()]
|
|
print("Projecting on useful columns...")
|
|
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
|
|
print("Sorting by time...")
|
|
dfm = dfm.sort_values(by=["machine_id", "time"])
|
|
print("Converting to broadcast variable...")
|
|
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
|
|
print("Done with machine events.")
|
|
|
|
def tabid(x):
|
|
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
|
|
|
|
# interpolate machine data by extending each last machine report before a gap to cover it
|
|
def clean_usage(x):
|
|
return [tabid(x), Decimal(x.average_usage_cpus), Decimal(x.average_usage_memory), int(x.start_time),
|
|
int(x.end_time), x.machine_id]
|
|
|
|
def interpolate_usage(ts):
|
|
ts = sorted(ts, key=lambda x: x[3])
|
|
l = len(ts)
|
|
for i in range(1, l-1):
|
|
if ts[i+1][3] > ts[i][4]:
|
|
ts[i][4] = ts[i+1][3]
|
|
return ts
|
|
|
|
dfu = dfu.rdd \
|
|
.filter(lambda x: x.start_time is not None and x.end_time is not None and
|
|
x.instance_index is not None and x.collection_id is not None and
|
|
x.machine_id is not None) \
|
|
.map(clean_usage).groupBy(lambda x: x[0]) \
|
|
.flatMap(lambda x: interpolate_usage(x[1])).toDF(["id", "acpu", "aram", "start", "end", "mid"])
|
|
|
|
try:
|
|
dfe["collection_type"] = dfe["collection_type"].cast(ByteType())
|
|
except:
|
|
dfe = dfe.withColumn("collection_type", lit(None).cast(ByteType()))
|
|
|
|
dfe = dfe.rdd \
|
|
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
|
|
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
|
|
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
|
|
.map(lambda x: [tabid(x), int(x.time), int(x.type),
|
|
Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \
|
|
.toDF(["id", "time", "type", "rcpu", "rram", "mid"])
|
|
|
|
df = dfe.alias("dfe").join(dfu.alias("dfu"), [dfe.id == dfu.id, dfe.mid == dfu.mid, dfe.time > dfu.start, dfe.time <= dfu.end]) \
|
|
.selectExpr("dfe.id as id", "dfe.mid as mid", "dfe.time as time", "dfe.type as type", "dfe.rcpu as rcpu",
|
|
"dfe.rram as rram", "dfu.acpu as acpu", "dfu.aram as aram")
|
|
|
|
df.write.parquet("/home/claudio/raid0/figure-8-join-" + cluster + ".parquet")
|
|
|
|
# vim: set ts=4 sw=4 et tw=120:
|