# vim: set ts=4 sw=4 et tw=80: #!/usr/bin/env python3 # coding: utf-8 import json import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import * from decimal import * cluster=sys.argv[1] schema = StructType() \ .add("start_time", LongType(), True) \ .add("end_time", LongType(), True) \ .add("collection_id", StringType(), True) \ .add("instance_index", StringType(), True) \ .add("machine_id", StringType(), True) \ .add("alloc_collection_id", LongType(), True) \ .add("alloc_instance_index", StringType(), True) \ .add("collection_type", ByteType(), True) \ .add("average_usage_cpus", DoubleType(), True) \ .add("average_usage_memory", DoubleType(), True) \ .add("maximum_usage_cpus", DoubleType(), True) \ .add("maximum_usage_memory", DoubleType(), True) \ .add("random_sample_usage_cpus", DoubleType(), True) \ .add("random_sample_usage_memory", DoubleType(), True) \ .add("assigned_memory", DoubleType(), True) \ .add("page_cache_memory", DoubleType(), True) \ .add("cycles_per_instruction", DoubleType(), True) \ .add("memory_accLesses_per_instruction", DoubleType(), True) \ .add("sample_rate", DoubleType(), True) \ .add("cpu_usage_dist_00", DoubleType(), True) \ .add("cpu_usage_dist_10", DoubleType(), True) \ .add("cpu_usage_dist_20", DoubleType(), True) \ .add("cpu_usage_dist_30", DoubleType(), True) \ .add("cpu_usage_dist_40", DoubleType(), True) \ .add("cpu_usage_dist_50", DoubleType(), True) \ .add("cpu_usage_dist_60", DoubleType(), True) \ .add("cpu_usage_dist_70", DoubleType(), True) \ .add("cpu_usage_dist_80", DoubleType(), True) \ .add("cpu_usage_dist_90", DoubleType(), True) \ .add("cpu_usage_dist_91", DoubleType(), True) \ .add("cpu_usage_dist_92", DoubleType(), True) \ .add("cpu_usage_dist_93", DoubleType(), True) \ .add("cpu_usage_dist_94", DoubleType(), True) \ .add("cpu_usage_dist_95", DoubleType(), True) \ .add("cpu_usage_dist_96", DoubleType(), True) \ .add("cpu_usage_dist_97", DoubleType(), True) \ .add("cpu_usage_dist_98", DoubleType(), True) \ .add("cpu_usage_dist_99", DoubleType(), True) \ spark = pyspark.sql.SparkSession.builder \ .appName("task_slowdown") \ .config("spark.driver.maxResultSize", "256g") \ .config("spark.local.dir", "/home/claudio/tmp") \ .config("spark.driver.memory", "200g") \ .getOrCreate() sc = spark.sparkContext # path = "/home/claudio/" + cluster + "/" + cluster # path += "_instance_usage*.csv.gz" path = "/home/claudio/raid0/google_2019/instance_usage/" + cluster + "/" + cluster + "_instance_usage*.json.gz" #path += "_test.csv" df = spark.read.format("json") \ .option("header", False) \ .schema(schema) \ .load(path) def compute_res_s(x): delta = int(x.end_time) - int(x.start_time); return [x.collection_id + "-" + x.instance_index, Decimal(x.average_usage_cpus) * Decimal(delta), Decimal(x.average_usage_memory) * Decimal(delta)] def sum_res(xs): cpu = Decimal(0) ram = Decimal(0) for x in xs: cpu += x[1] ram += x[2] return ['{0:f}'.format(cpu), '{0:f}'.format(ram)] filename = "/home/claudio/p620/google_2019/thesis_queries" + \ "/spatial_resource_waste/" + cluster + "_res_micros_actual_per_job" df2 = df.rdd \ .filter(lambda x: x.collection_type is None or x.collection_type == 0) \ .filter(lambda x: x.start_time is not None and x.end_time is not None and x.instance_index is not None and x.collection_id is not None) \ .map(compute_res_s) \ .groupBy(lambda x: x[0]) \ .mapValues(sum_res) \ .map(lambda x: x[0] + "," + x[1][0] + "," + x[1][1]) \ .saveAsTextFile(filename, "org.apache.hadoop.io.compress.GzipCodec")