#!/usr/bin/env python3 # coding: utf-8 import json import pandas import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip import os import pyspark.sql.functions as F from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import StructType, LongType, StringType, ByteType def main(): global DIR global NAME global launched if len(sys.argv) != 4: print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") sys.exit() DIR = os.path.dirname(__file__) NAME = "table-iii-all" if os.path.exists(DIR + "/" + NAME + "-working") or os.path.exists(DIR + "/" + NAME + ".parquet"): print("already launched") launched = True sys.exit() os.system("touch " + DIR + "/" + NAME + "-working") spark = pyspark.sql.SparkSession.builder \ .appName(NAME) \ .config("spark.driver.maxResultSize", "32g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext cluster = "a" df = spark.read.parquet(DIR + "/bigtable-" + cluster + ".parquet").withColumn("cluster", lit("a")) for cluster in "bcdefgh": dfn = spark.read.parquet(DIR + "/bigtable-" + cluster + ".parquet").withColumn("cluster", lit(cluster)) df = df.union(dfn) df = df.withColumn("count_sum", df["count_0"] + \ df["count_1"] + \ df["count_2"] + \ df["count_3"] + \ df["count_4"] + \ df["count_5"] + \ df["count_6"] + \ df["count_7"] + \ df["count_8"] + \ df["count_9"] + \ df["count_10"]) df = df.groupBy("task_term") \ .agg(F.expr("percentile(count_sum, array(0.95))")[0].alias('%95'), F.expr("avg(count_sum)").alias('mean'), F.expr("avg(count_0)").alias('avg_count_0'), F.expr("avg(count_1)").alias('avg_count_1'), F.expr("avg(count_2)").alias('avg_count_2'), F.expr("avg(count_3)").alias('avg_count_3'), F.expr("avg(count_4)").alias('avg_count_4'), F.expr("avg(count_5)").alias('avg_count_5'), F.expr("avg(count_6)").alias('avg_count_6'), F.expr("avg(count_7)").alias('avg_count_7'), F.expr("avg(count_8)").alias('avg_count_8'), F.expr("avg(count_9)").alias('avg_count_9'), F.expr("avg(count_10)").alias('avg_count_10')) df.repartition(1).write.option("header", True).csv(DIR + "/" + NAME + ".csv") if __name__ == "__main__": launched = False try: main() finally: if not launched: os.system("rm -v " + DIR + "/" + NAME + "-working") # vim: set ts=4 sw=4 et tw=120: