#!/usr/bin/env python3 # coding: utf-8 import json import pandas import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip import os import pyspark.sql.functions as F from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import StructType, LongType, StringType, ByteType def main(): global DIR global NAME global launched if len(sys.argv) != 4: print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") sys.exit() cluster=sys.argv[1] DIR = os.path.dirname(__file__) NAME = "table-iv-evts-all" if os.path.exists(DIR + "/" + NAME + "-working") or os.path.exists(DIR + "/" + NAME + ".parquet"): print("already launched") launched = True sys.exit() os.system("touch " + DIR + "/" + NAME + "-working") spark = pyspark.sql.SparkSession.builder \ .appName(NAME) \ .config("spark.driver.maxResultSize", "32g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext dft = None for cluster in "abcdefgh": df = spark.read.parquet(DIR + "/bigtable-" + cluster + ".parquet").withColumn("cluster", lit(cluster)) usage_schema = StructType() \ .add("jobid", StringType(), False) \ .add("job_term", LongType(), False) \ .add("task_count", LongType(), False) dfj = spark.read.format("csv") \ .option("header", False) \ .schema(usage_schema) \ .load("/home/claudio/google_2019/thesis_queries/figure_9/" + cluster + "_task_count") dfj = dfj.select(["jobid", "job_term"]) df = df.join(dfj, "jobid") if dft is None: dft = df else: dft = dft.union(df) df = df.groupBy("job_term").agg( \ F.expr("avg(count_0)").alias('avg_count_0'), F.expr("avg(count_1)").alias('avg_count_1'), F.expr("avg(count_2)").alias('avg_count_2'), F.expr("avg(count_3)").alias('avg_count_3'), F.expr("avg(count_4)").alias('avg_count_4'), F.expr("avg(count_5)").alias('avg_count_5'), F.expr("avg(count_6)").alias('avg_count_6'), F.expr("avg(count_7)").alias('avg_count_7'), F.expr("avg(count_8)").alias('avg_count_8'), F.expr("avg(count_9)").alias('avg_count_9'), F.expr("avg(count_10)").alias('avg_count_10')) df.repartition(1).write.csv(DIR + "/" + NAME + ".csv") if __name__ == "__main__": launched = False try: main() finally: if not launched: os.system("rm -v " + DIR + "/" + NAME + "-working") # vim: set ts=4 sw=4 et tw=120: