bachelorThesis/spatial_resource_waste/spatial_res_actual_csv.py

107 lines
3.9 KiB
Python

# vim: set ts=4 sw=4 et tw=80:
#!/usr/bin/env python3
# coding: utf-8
import json
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
cluster=sys.argv[1]
schema = StructType() \
.add("start_time", LongType(), True) \
.add("end_time", LongType(), True) \
.add("collection_id", StringType(), True) \
.add("instance_index", StringType(), True) \
.add("machine_id", StringType(), True) \
.add("alloc_collection_id", LongType(), True) \
.add("alloc_instance_index", StringType(), True) \
.add("collection_type", ByteType(), True) \
.add("average_usage_cpus", DoubleType(), True) \
.add("average_usage_memory", DoubleType(), True) \
.add("maximum_usage_cpus", DoubleType(), True) \
.add("maximum_usage_memory", DoubleType(), True) \
.add("random_sample_usage_cpus", DoubleType(), True) \
.add("random_sample_usage_memory", DoubleType(), True) \
.add("assigned_memory", DoubleType(), True) \
.add("page_cache_memory", DoubleType(), True) \
.add("cycles_per_instruction", DoubleType(), True) \
.add("memory_accLesses_per_instruction", DoubleType(), True) \
.add("sample_rate", DoubleType(), True) \
.add("cpu_usage_dist_00", DoubleType(), True) \
.add("cpu_usage_dist_10", DoubleType(), True) \
.add("cpu_usage_dist_20", DoubleType(), True) \
.add("cpu_usage_dist_30", DoubleType(), True) \
.add("cpu_usage_dist_40", DoubleType(), True) \
.add("cpu_usage_dist_50", DoubleType(), True) \
.add("cpu_usage_dist_60", DoubleType(), True) \
.add("cpu_usage_dist_70", DoubleType(), True) \
.add("cpu_usage_dist_80", DoubleType(), True) \
.add("cpu_usage_dist_90", DoubleType(), True) \
.add("cpu_usage_dist_91", DoubleType(), True) \
.add("cpu_usage_dist_92", DoubleType(), True) \
.add("cpu_usage_dist_93", DoubleType(), True) \
.add("cpu_usage_dist_94", DoubleType(), True) \
.add("cpu_usage_dist_95", DoubleType(), True) \
.add("cpu_usage_dist_96", DoubleType(), True) \
.add("cpu_usage_dist_97", DoubleType(), True) \
.add("cpu_usage_dist_98", DoubleType(), True) \
.add("cpu_usage_dist_99", DoubleType(), True) \
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "256g") \
.config("spark.local.dir", "/home/claudio/tmp") \
.config("spark.driver.memory", "200g") \
.getOrCreate()
sc = spark.sparkContext
# path = "/home/claudio/" + cluster + "/" + cluster
# path += "_instance_usage*.csv.gz"
path = "/home/claudio/raid0/google_2019/instance_usage/" + cluster + "/" + cluster + "_instance_usage*.json.gz"
#path += "_test.csv"
df = spark.read.json(path)
#df = spark.read.format("json") \
# .option("header", False) \
# .schema(schema) \
# .load(path)
def compute_res_s(x):
delta = int(x.end_time) - int(x.start_time);
return [x.collection_id + "-" + x.instance_index,
Decimal(x.average_usage_cpus) * Decimal(delta),
Decimal(x.average_usage_memory) * Decimal(delta)]
def sum_res(xs):
cpu = Decimal(0)
ram = Decimal(0)
for x in xs:
cpu += x[1]
ram += x[2]
return ['{0:f}'.format(cpu), '{0:f}'.format(ram)]
filename = "/home/claudio/p620/google_2019/thesis_queries" + \
"/spatial_resource_waste/" + cluster + "_res_micros_actual_per_job"
df2 = df.rdd \
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
.filter(lambda x: x.start_time is not None and x.end_time is not None and
x.instance_index is not None and x.collection_id is not None) \
.map(compute_res_s) \
.groupBy(lambda x: x[0]) \
.mapValues(sum_res) \
.map(lambda x: x[0] + "," + x[1][0] + "," + x[1][1]) \
.saveAsTextFile(filename, "org.apache.hadoop.io.compress.GzipCodec")