#!/usr/bin/env python3 # coding: utf-8 import json import pandas import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip import os import pyspark.sql.functions as F from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import StructType, LongType, StringType, ByteType def main(): global DIR global NAME global launched if len(sys.argv) != 4: print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") sys.exit() DIR = os.path.dirname(__file__) NAME = "table-iv-tasks-all" if os.path.exists(DIR + "/" + NAME + "-working") or os.path.exists(DIR + "/" + NAME + ".parquet"): print("already launched") launched = True sys.exit() os.system("touch " + DIR + "/" + NAME + "-working") spark = pyspark.sql.SparkSession.builder \ .appName(NAME) \ .config("spark.driver.maxResultSize", "32g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext usage_schema = StructType() \ .add("jobid", StringType(), False) \ .add("job_term", LongType(), False) \ .add("task_count", LongType(), False) df = spark.read.format("csv") \ .option("header", False) \ .schema(usage_schema) \ .load("/home/claudio/google_2019/thesis_queries/figure_9/?_task_count") df = df.groupBy("job_term").agg( \ F.expr("avg(task_count)").alias('mean_tc'), F.expr("percentile(task_count, array(0.95))")[0].alias('%95_tc')) df.repartition(1).write.csv(DIR + "/" + NAME + ".csv") if __name__ == "__main__": launched = False try: main() finally: if not launched: os.system("rm -v " + DIR + "/" + NAME + "-working") # vim: set ts=4 sw=4 et tw=120: