#!/usr/bin/env python3 # coding: utf-8 import os import json import pandas as pd import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import * from decimal import * import os if len(sys.argv) is not 5: print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}") sys.exit() cluster=sys.argv[1] if os.path.exists("/home/claudio/google_2019/thesis_queries/figure_9/figure-9a-task-count-" + cluster + "-working"): print("already done") sys.exit() os.system("touch /home/claudio/google_2019/thesis_queries/figure_9/figure-9a-task-count-" + cluster + "-working") spark = pyspark.sql.SparkSession.builder \ .appName("figure_9a") \ .config("spark.driver.maxResultSize", "128g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext # READING INSTANCE EVENTS DATA dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz" #dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json" df = spark.read.json(dfepath) try: df["collection_type"] = df["collection_type"].cast(ByteType()) except: df = df.withColumn("collection_type", lit(None).cast(ByteType())) df = df.rdd.filter(lambda x: x.collection_id is not None and x.instance_index is not None and (x.collection_type == 0 or x.collection_type is None)) \ .map(lambda x: (x.collection_id, x.instance_index)) \ .groupBy(lambda x: x[0]) \ .map(lambda x: [x[0], len(set(x[1]))]) \ .toDF(["jobid", "task_count"]) df.write.parquet("/home/claudio/google_2019/thesis_queries/figure_9/figure-9a-task-count-" + cluster + ".parquet") os.system("rm /home/claudio/google_2019/thesis_queries/figure_9/figure-9a-task-count-" + cluster + "-working") # vim: set ts=4 sw=4 et tw=120: