#!/usr/bin/env python3 # coding: utf-8 import json import pandas import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip import os import pyspark.sql.functions as F from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import StructType, LongType, StringType, ByteType def main(): global DIR global NAME global launched if len(sys.argv) != 4: print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") sys.exit() cluster=sys.argv[1] DIR = os.path.dirname(__file__) NAME = "fig-5-" + cluster if os.path.exists(DIR + "/" + NAME + "-working") or os.path.exists(DIR + "/" + NAME + ".parquet"): print("already launched") launched = True sys.exit() os.system("touch " + DIR + "/" + NAME + "-working") spark = pyspark.sql.SparkSession.builder \ .appName(NAME) \ .config("spark.driver.maxResultSize", "32g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext df = spark.read.parquet(DIR + "/bigtable-" + cluster + ".parquet") cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0)) df = df.groupBy(["count_4", "count_5", "count_7", "count_8"]).agg( \ cnt_cond(F.col('task_term') == 6).alias('count_succ'), cnt_cond(F.col('task_term') != 6).alias('count_not_succ')) df.repartition(1).write.csv(DIR + "/" + NAME + ".csv") if __name__ == "__main__": launched = False DIR = None NAME = None try: main() finally: if not launched and DIR and NAME: os.system("rm -v " + DIR + "/" + NAME + "-working") # vim: set ts=4 sw=4 et tw=120: