#!/usr/bin/env python3 # coding: utf-8 import json import pandas import findspark import pyspark import pyspark.sql import sys import gzip import os from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import ByteType from pyspark.sql.functions import col import math if len(sys.argv) is not 4: print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") sys.exit() cluster=sys.argv[1] findspark.init() DIR = os.path.dirname(__file__) spark = pyspark.sql.SparkSession.builder \ .appName("figure_9a_join") \ .config("spark.driver.maxResultSize", "128g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext dfepath="/home/claudio/google_2019/collection_events/" + cluster + "/" + cluster + "_collection_events*.json.gz" #dfepath="/home/claudio/google_2019/collection_events/" + cluster + "/" + cluster + "_test.json" df = spark.read.json(dfepath) df = df.withColumnRenamed("collection_id", "jobid") \ .select("jobid", "collection_type", "time", "type") try: df["collection_type"] = df["collection_type"].cast(ByteType()) except: df = df.withColumn("collection_type", lit(None).cast(ByteType())) df = df.filter(((col("collection_type").isNull()) | (col("collection_type") == "0")) & (col("time").isNotNull())) jpath = DIR + "/figure-9c-machine-count-" + cluster + ".parquet" dfj = spark.read.parquet(jpath) jjpath = DIR + "/figure-9a-task-count-" + cluster + ".parquet" dfjj = spark.read.parquet(jjpath) df = df.join(dfj, 'jobid', 'left') df = df.join(dfjj, "jobid") MICROS = 1000000 def round_down(n, decimals=0): multiplier = 10 ** decimals return math.floor(n * multiplier) / multiplier def for_each_job(data): jobid = data[0] ts = data[1] global non ts = sorted(ts, key=lambda x: x["time"]) l = len(ts) last_term = -1 machine_count = 0 for i,t in enumerate(ts): if t["type"] >= 4 and t["type"] <= 8: last_term = t["type"] if t["task_count"] <= 0 or t["machine_count"] <= 0: machine_count = -1 else: machine_count = round_down(t["machine_count"] / t["task_count"], 1) return str(jobid) + "," + str(last_term) + "," + str(machine_count) def cleanup(x): return { "time": int(x.time), "type": 0 if x.type is None else int(x.type), "id": x.jobid, "task_count": -1 if x.task_count is None else int(x.task_count), "machine_count": -1 if x.machine_count is None else int(x.machine_count) } df2 = df.rdd \ .map(cleanup) \ .groupBy(lambda x: x["id"]) \ .repartition(100) \ .map(for_each_job) \ .coalesce(1) \ .saveAsTextFile(cluster + "_machine_locality") # vim: set ts=4 sw=4 et tw=80: