104 lines
3.8 KiB
Python
104 lines
3.8 KiB
Python
|
# vim: set ts=4 sw=4 et tw=80:
|
||
|
#!/usr/bin/env python3
|
||
|
# coding: utf-8
|
||
|
|
||
|
import json
|
||
|
import findspark
|
||
|
findspark.init()
|
||
|
import pyspark
|
||
|
import pyspark.sql
|
||
|
import sys
|
||
|
import gzip
|
||
|
|
||
|
from pyspark import AccumulatorParam
|
||
|
from pyspark.sql.functions import lit
|
||
|
from pyspark.sql import Window
|
||
|
from pyspark.sql.types import *
|
||
|
from decimal import *
|
||
|
|
||
|
cluster=sys.argv[1]
|
||
|
|
||
|
schema = StructType() \
|
||
|
.add("start_time", LongType(), True) \
|
||
|
.add("end_time", LongType(), True) \
|
||
|
.add("collection_id", StringType(), True) \
|
||
|
.add("instance_index", StringType(), True) \
|
||
|
.add("machine_id", StringType(), True) \
|
||
|
.add("alloc_collection_id", LongType(), True) \
|
||
|
.add("alloc_instance_index", StringType(), True) \
|
||
|
.add("collection_type", ByteType(), True) \
|
||
|
.add("average_usage_cpus", DoubleType(), True) \
|
||
|
.add("average_usage_memory", DoubleType(), True) \
|
||
|
.add("maximum_usage_cpus", DoubleType(), True) \
|
||
|
.add("maximum_usage_memory", DoubleType(), True) \
|
||
|
.add("random_sample_usage_cpus", DoubleType(), True) \
|
||
|
.add("random_sample_usage_memory", DoubleType(), True) \
|
||
|
.add("assigned_memory", DoubleType(), True) \
|
||
|
.add("page_cache_memory", DoubleType(), True) \
|
||
|
.add("cycles_per_instruction", DoubleType(), True) \
|
||
|
.add("memory_accLesses_per_instruction", DoubleType(), True) \
|
||
|
.add("sample_rate", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_00", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_10", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_20", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_30", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_40", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_50", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_60", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_70", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_80", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_90", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_91", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_92", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_93", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_94", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_95", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_96", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_97", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_98", DoubleType(), True) \
|
||
|
.add("cpu_usage_dist_99", DoubleType(), True) \
|
||
|
|
||
|
spark = pyspark.sql.SparkSession.builder \
|
||
|
.appName("task_slowdown") \
|
||
|
.config("spark.driver.maxResultSize", "256g") \
|
||
|
.config("spark.local.dir", "/home/claudio/tmp") \
|
||
|
.config("spark.driver.memory", "120g") \
|
||
|
.getOrCreate()
|
||
|
sc = spark.sparkContext
|
||
|
|
||
|
path = "/home/claudio/" + cluster + "/" + cluster
|
||
|
path += "_instance_usage*.csv.gz"
|
||
|
#path += "_test.csv"
|
||
|
|
||
|
df = spark.read.format("csv") \
|
||
|
.option("header", False) \
|
||
|
.schema(schema) \
|
||
|
.load(path)
|
||
|
|
||
|
def compute_res_s(x):
|
||
|
delta = int(x.end_time) - int(x.start_time);
|
||
|
return [x.collection_id + "-" + x.instance_index,
|
||
|
Decimal(x.average_usage_cpus) * Decimal(delta),
|
||
|
Decimal(x.average_usage_memory) * Decimal(delta)]
|
||
|
|
||
|
def sum_res(xs):
|
||
|
cpu = Decimal(0)
|
||
|
ram = Decimal(0)
|
||
|
for x in xs:
|
||
|
cpu += x[1]
|
||
|
ram += x[2]
|
||
|
return ['{0:f}'.format(cpu), '{0:f}'.format(ram)]
|
||
|
|
||
|
filename = "/home/claudio/p620/google_2019/thesis_queries" + \
|
||
|
"/spatial_resource_waste/" + cluster + "_res_micros_actual_per_job"
|
||
|
|
||
|
df2 = df.rdd \
|
||
|
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
|
||
|
.filter(lambda x: x.start_time is not None and x.end_time is not None and
|
||
|
x.instance_index is not None and x.collection_id is not None) \
|
||
|
.map(compute_res_s) \
|
||
|
.groupBy(lambda x: x[0]) \
|
||
|
.mapValues(sum_res) \
|
||
|
.map(lambda x: x[0] + "," + x[1][0] + "," + x[1][1]) \
|
||
|
.saveAsTextFile(filename, "org.apache.hadoop.io.compress.GzipCodec")
|