bachelorThesis/figure_8/figure8-join.py

149 lines
5.9 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf-8
import os
import json
import pandas as pd
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
TESTDATA = False
if len(sys.argv) is not 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
sys.exit()
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
#
# READING INSTANCE EVENTS DATA
#
dfepath = "/home/claudio/google_2019/instance_events/" + cluster
dfepath += "/" + cluster + ("_instance_events00000000000?.json.gz" if TESTDATA else "_instance_events*.json.gz")
dfe = spark.read.json(dfepath)
#
# READING INSTANCE USAGE DATA
#
dfupath = "/home/claudio/google_2019/instance_usage/" + cluster
dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.json.gz")
usage_schema = StructType() \
.add("start_time", LongType(), True) \
.add("end_time", LongType(), True) \
.add("collection_id", StringType(), True) \
.add("instance_index", StringType(), True) \
.add("machine_id", StringType(), True) \
.add("alloc_collection_id", LongType(), True) \
.add("alloc_instance_index", StringType(), True) \
.add("collection_type", ByteType(), True) \
.add("average_usage_cpus", DoubleType(), True) \
.add("average_usage_memory", DoubleType(), True) \
.add("maximum_usage_cpus", DoubleType(), True) \
.add("maximum_usage_memory", DoubleType(), True) \
.add("random_sample_usage_cpus", DoubleType(), True) \
.add("random_sample_usage_memory", DoubleType(), True) \
.add("assigned_memory", DoubleType(), True) \
.add("page_cache_memory", DoubleType(), True) \
.add("cycles_per_instruction", DoubleType(), True) \
.add("memory_accLesses_per_instruction", DoubleType(), True) \
.add("sample_rate", DoubleType(), True) \
.add("cpu_usage_dist_00", DoubleType(), True) \
.add("cpu_usage_dist_10", DoubleType(), True) \
.add("cpu_usage_dist_20", DoubleType(), True) \
.add("cpu_usage_dist_30", DoubleType(), True) \
.add("cpu_usage_dist_40", DoubleType(), True) \
.add("cpu_usage_dist_50", DoubleType(), True) \
.add("cpu_usage_dist_60", DoubleType(), True) \
.add("cpu_usage_dist_70", DoubleType(), True) \
.add("cpu_usage_dist_80", DoubleType(), True) \
.add("cpu_usage_dist_90", DoubleType(), True) \
.add("cpu_usage_dist_91", DoubleType(), True) \
.add("cpu_usage_dist_92", DoubleType(), True) \
.add("cpu_usage_dist_93", DoubleType(), True) \
.add("cpu_usage_dist_94", DoubleType(), True) \
.add("cpu_usage_dist_95", DoubleType(), True) \
.add("cpu_usage_dist_96", DoubleType(), True) \
.add("cpu_usage_dist_97", DoubleType(), True) \
.add("cpu_usage_dist_98", DoubleType(), True) \
.add("cpu_usage_dist_99", DoubleType(), True)
dfu = spark.read.format("json") \
.option("header", False) \
.schema(usage_schema) \
.load(dfupath)
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
print("Starting to read machine events...")
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv")
print("Dropping remove events...")
dfm = dfm[(dfm.type==1)|(dfm.type==3)]
print("Dropping missing data events...")
dfm = dfm[dfm.missing_data_reason.notnull()]
print("Projecting on useful columns...")
dfm = dfm[["time", "machine_id", "capacity.cpus", "capacity.memory"]]
print("Sorting by time...")
dfm = dfm.sort_values(by=["machine_id", "time"])
print("Converting to broadcast variable...")
dfm = sc.broadcast([tuple(r) for r in dfm.to_numpy()])
print("Done with machine events.")
def tabid(x):
return Decimal(x.collection_id) + Decimal(x.instance_index) / Decimal(2**64)
# interpolate machine data by extending each last machine report before a gap to cover it
def clean_usage(x):
return [tabid(x), Decimal(x.average_usage_cpus), Decimal(x.average_usage_memory), int(x.start_time),
int(x.end_time), x.machine_id]
def interpolate_usage(ts):
ts = sorted(ts, key=lambda x: x[3])
l = len(ts)
for i in range(1, l-1):
if ts[i+1][3] > ts[i][4]:
ts[i][4] = ts[i+1][3]
return ts
dfu = dfu.rdd \
.filter(lambda x: x.start_time is not None and x.end_time is not None and
x.instance_index is not None and x.collection_id is not None and
x.machine_id is not None) \
.map(clean_usage).groupBy(lambda x: x[0]) \
.flatMap(lambda x: interpolate_usage(x[1])).toDF(["id", "acpu", "aram", "start", "end", "mid"])
try:
dfe["collection_type"] = dfe["collection_type"].cast(ByteType())
except:
dfe = dfe.withColumn("collection_type", lit(None).cast(ByteType()))
dfe = dfe.rdd \
.filter(lambda x: x.time is not None and x.type is not None and x.machine_id is not None and
x.instance_index is not None and x.collection_id is not None and x.resource_request is not None and
x.resource_request.cpus is not None and x.resource_request.memory is not None) \
.map(lambda x: [tabid(x), int(x.time), int(x.type),
Decimal(x.resource_request.cpus), Decimal(x.resource_request.memory), x.machine_id]) \
.toDF(["id", "time", "type", "rcpu", "rram", "mid"])
df = dfe.alias("dfe").join(dfu.alias("dfu"), [dfe.id == dfu.id, dfe.mid == dfu.mid, dfe.time > dfu.start, dfe.time <= dfu.end]) \
.selectExpr("dfe.id as id", "dfe.mid as mid", "dfe.time as time", "dfe.type as type", "dfe.rcpu as rcpu",
"dfe.rram as rram", "dfu.acpu as acpu", "dfu.aram as aram")
df.write.parquet("/home/claudio/raid0/figure-8-join-" + cluster + ".parquet")
# vim: set ts=4 sw=4 et tw=120: