#!/usr/bin/env python3 # coding: utf-8 import json import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import ByteType cluster=sys.argv[1] spark = pyspark.sql.SparkSession.builder \ .appName("task_slowdown") \ .config("spark.driver.maxResultSize", "128g") \ .config("spark.local.dir", "/tmp") \ .config("spark.driver.memory", "124g") \ .getOrCreate() sc = spark.sparkContext df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz") #df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json") try: df["collection_type"] = df["collection_type"].cast(ByteType()) except: df = df.withColumn("collection_type", lit(None).cast(ByteType())) def for_each_task(ts): global non ts = sorted(ts, key=lambda x: x["time"]) last_term = -1 for t in ts: if t["type"] >= 4 and t["type"] <= 8: last_term = t["type"] return last_term def cleanup(x): return { "time": int(x.time), "type": 0 if x.type is None else int(x.type), "id": x.collection_id + "-" + x.instance_index, "priority": -1 if x.priority is None else int(x.priority) } filename = "/home/claudio/google_2019/thesis_queries" + \ "/spatial_resource_waste/" + cluster + "_task_term_table" df2 = df.rdd \ .filter(lambda x: x.collection_type is None or x.collection_type == 0) \ .filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \ .map(cleanup) \ .groupBy(lambda x: x["id"]) \ .mapValues(for_each_task) \ .map(lambda x: x[0] + "," + str(x[1])) \ .saveAsTextFile(filename, "org.apache.hadoop.io.compress.GzipCodec")