#!/usr/bin/env python3 # coding: utf-8 import json import pandas import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip import os import pyspark.sql.functions as F from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import StructType, LongType, StringType, ByteType def main(): global DIR global NAME global launched if len(sys.argv) != 4: print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") sys.exit() cluster=sys.argv[1] DIR = os.path.dirname(__file__) NAME = "table-iii-" + cluster if os.path.exists(DIR + "/" + NAME + "-working") or os.path.exists(DIR + "/" + NAME + ".parquet"): print("already launched") launched = True sys.exit() os.system("touch " + DIR + "/" + NAME + "-working") spark = pyspark.sql.SparkSession.builder \ .appName(NAME) \ .config("spark.driver.maxResultSize", "32g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext df = spark.read.parquet(DIR + "/bigtable-" + cluster + ".parquet") # df = df.groupBy("jobid").agg( # F.expr("sum(count_0)").alias('count_0'), # F.expr("sum(count_1)").alias('count_1'), # F.expr("sum(count_2)").alias('count_2'), # F.expr("sum(count_3)").alias('count_3'), # F.expr("sum(count_4)").alias('count_4'), # F.expr("sum(count_5)").alias('count_5'), # F.expr("sum(count_6)").alias('count_6'), # F.expr("sum(count_7)").alias('count_7'), # F.expr("sum(count_8)").alias('count_8'), # F.expr("sum(count_9)").alias('count_9'), # F.expr("sum(count_10)").alias('count_10')) # # usage_schema = StructType() \ # .add("jobid", StringType(), False) \ # .add("job_term", LongType(), False) \ # .add("task_count", LongType(), False) # dfj = spark.read.format("csv") \ # .option("header", False) \ # .schema(usage_schema) \ # .load("/home/claudio/google_2019/thesis_queries/figure_9/" + cluster + "_task_count") # dfj = dfj.select(["jobid", "job_term"]) # # df = df.join(dfj, "jobid") df = df.withColumn("count_sum", df["count_0"] + \ df["count_1"] + \ df["count_2"] + \ df["count_3"] + \ df["count_4"] + \ df["count_5"] + \ df["count_6"] + \ df["count_7"] + \ df["count_8"] + \ df["count_9"] + \ df["count_10"]) df = df.groupBy("task_term") \ .agg(F.expr("percentile(count_sum, array(0.95))")[0].alias('%95'), F.expr("avg(count_sum)").alias('mean'), F.expr("avg(count_0)").alias('avg_count_0'), F.expr("avg(count_1)").alias('avg_count_1'), F.expr("avg(count_2)").alias('avg_count_2'), F.expr("avg(count_3)").alias('avg_count_3'), F.expr("avg(count_4)").alias('avg_count_4'), F.expr("avg(count_5)").alias('avg_count_5'), F.expr("avg(count_6)").alias('avg_count_6'), F.expr("avg(count_7)").alias('avg_count_7'), F.expr("avg(count_8)").alias('avg_count_8'), F.expr("avg(count_9)").alias('avg_count_9'), F.expr("avg(count_10)").alias('avg_count_10')) df.repartition(1).write.option("header", True).csv(DIR + "/" + NAME + ".csv") if __name__ == "__main__": launched = False try: main() finally: if not launched: os.system("rm -v " + DIR + "/" + NAME + "-working") # vim: set ts=4 sw=4 et tw=120: