#!/usr/bin/env python3 # coding: utf-8 import json import pandas import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip import os from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import ByteType if len(sys.argv) is not 4: print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") sys.exit() cluster=sys.argv[1] if os.path.exists("/home/claudio/google_2019/thesis_queries/figure_7/" + cluster + "_priority"): print("already computed") sys.exit() spark = pyspark.sql.SparkSession.builder \ .appName("task_slowdown") \ .config("spark.driver.maxResultSize", "128g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz") #df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json") try: df["collection_type"] = df["collection_type"].cast(ByteType()) except: df = df.withColumn("collection_type", lit(None).cast(ByteType())) def for_each_task(data): ts = data[1] priority = -1 term = -1 min_time = -1 max_time = -1 for t in ts: if (min_time == -1 or t["time"] <= min_time) and t["priority"] != -1: priority = t["priority"] min_time = t["time"] if (max_time == -1 or t["time"] >= max_time) and t["type"] >= 4 and t["type"] <= 8: term = t["type"] max_time = t["time"] return (term, priority) def cleanup(x): return { "time": int(x.time), "type": 0 if x.type is None else int(x.type), "id": x.collection_id + "-" + x.instance_index, "priority": -1 if x.priority is None else int(x.priority) } def sum_rows(xs): csum = 0 for x in xs: csum += x[3] return csum df2 = df.rdd \ .filter(lambda x: x.collection_type is None or x.collection_type == 0) \ .filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \ .map(cleanup) \ .groupBy(lambda x: x["id"]) \ .map(for_each_task) \ .groupBy(lambda x: x) \ .mapValues(lambda x: len(x)) \ .map(lambda x: str(x[0][0]) + "," + str(x[0][1]) + "," + str(x[1])) \ .coalesce(1) \ .saveAsTextFile(cluster + "_priority") # vim: set ts=4 sw=4 et tw=80: